From 72e424b2983cde38c644bec3231736dd36a7b0ec Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Tue, 28 May 2024 17:53:04 +0800 Subject: [PATCH] KAFKA-16796: Introduce new org.apache.kafka.tools.api.Decoder to replace kafka.serializer.Decoder Signed-off-by: PoAn Yang --- .../main/scala/kafka/serializer/Decoder.scala | 7 ++- .../scala/kafka/tools/DumpLogSegments.scala | 54 ++++++++++++++++--- .../kafka/tools/DumpLogSegmentsTest.scala | 32 ++++++++++- .../org/apache/kafka/tools/api/Decoder.java | 25 +++++++++ .../kafka/tools/api/DefaultDecoder.java | 27 ++++++++++ .../kafka/tools/api/IntegerDecoder.java | 29 ++++++++++ .../apache/kafka/tools/api/LongDecoder.java | 29 ++++++++++ .../apache/kafka/tools/api/StringDecoder.java | 29 ++++++++++ 8 files changed, 222 insertions(+), 10 deletions(-) create mode 100644 tools/tools-api/src/main/java/org/apache/kafka/tools/api/Decoder.java create mode 100644 tools/tools-api/src/main/java/org/apache/kafka/tools/api/DefaultDecoder.java create mode 100644 tools/tools-api/src/main/java/org/apache/kafka/tools/api/IntegerDecoder.java create mode 100644 tools/tools-api/src/main/java/org/apache/kafka/tools/api/LongDecoder.java create mode 100644 tools/tools-api/src/main/java/org/apache/kafka/tools/api/StringDecoder.java diff --git a/core/src/main/scala/kafka/serializer/Decoder.scala b/core/src/main/scala/kafka/serializer/Decoder.scala index ce166689cfa6e..a0220a83913d5 100644 --- a/core/src/main/scala/kafka/serializer/Decoder.scala +++ b/core/src/main/scala/kafka/serializer/Decoder.scala @@ -27,13 +27,15 @@ import kafka.utils.VerifiableProperties * An implementation is required to provide a constructor that * takes a VerifiableProperties instance. */ -trait Decoder[T] { +@deprecated(since = "3.8.0") +trait Decoder[T] { def fromBytes(bytes: Array[Byte]): T } /** * The default implementation does nothing, just returns the same byte array it takes in. */ +@deprecated(since = "3.8.0") class DefaultDecoder(props: VerifiableProperties = null) extends Decoder[Array[Byte]] { def fromBytes(bytes: Array[Byte]): Array[Byte] = bytes } @@ -42,6 +44,7 @@ class DefaultDecoder(props: VerifiableProperties = null) extends Decoder[Array[B * The string decoder translates bytes into strings. It uses UTF8 by default but takes * an optional property serializer.encoding to control this. */ +@deprecated(since = "3.8.0") class StringDecoder(props: VerifiableProperties = null) extends Decoder[String] { val encoding: String = if (props == null) @@ -57,6 +60,7 @@ class StringDecoder(props: VerifiableProperties = null) extends Decoder[String] /** * The long decoder translates bytes into longs. */ +@deprecated(since = "3.8.0") class LongDecoder(props: VerifiableProperties = null) extends Decoder[Long] { def fromBytes(bytes: Array[Byte]): Long = { ByteBuffer.wrap(bytes).getLong @@ -66,6 +70,7 @@ class LongDecoder(props: VerifiableProperties = null) extends Decoder[Long] { /** * The integer decoder translates bytes into integers. */ +@deprecated(since = "3.8.0") class IntegerDecoder(props: VerifiableProperties = null) extends Decoder[Integer] { def fromBytes(bytes: Array[Byte]): Integer = { ByteBuffer.wrap(bytes).getInt() diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index a3f97d5fb9e5a..a4777f98541d3 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -23,9 +23,8 @@ import java.io._ import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory, ObjectNode, TextNode} import kafka.coordinator.transaction.TransactionLog import kafka.log._ -import kafka.serializer.Decoder -import kafka.utils._ import kafka.utils.Implicits._ +import kafka.utils.{CoreUtils, VerifiableProperties} import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.common.message.ConsumerProtocolAssignment import org.apache.kafka.common.message.ConsumerProtocolAssignmentJsonConverter @@ -47,6 +46,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapDirectory import org.apache.kafka.snapshot.Snapshots import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import org.apache.kafka.storage.internals.log.{CorruptSnapshotException, LogFileUtils, OffsetIndex, ProducerStateManager, TimeIndex, TransactionIndex} +import org.apache.kafka.tools.api.{Decoder, DefaultDecoder, IntegerDecoder, LongDecoder, StringDecoder} import java.nio.ByteBuffer import scala.jdk.CollectionConverters._ @@ -604,14 +604,14 @@ object DumpLogSegments { .ofType(classOf[java.lang.Integer]) .defaultsTo(Integer.MAX_VALUE) private val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration. Automatically set if print-data-log is enabled.") - private val valueDecoderOpt = parser.accepts("value-decoder-class", "if set, used to deserialize the messages. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.") + private val valueDecoderOpt = parser.accepts("value-decoder-class", "if set, used to deserialize the messages. This class should implement org.apache.kafka.tools.api.Decoder trait. Custom jar should be available in kafka/libs directory.") .withOptionalArg() .ofType(classOf[java.lang.String]) - .defaultsTo("kafka.serializer.StringDecoder") - private val keyDecoderOpt = parser.accepts("key-decoder-class", "if set, used to deserialize the keys. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.") + .defaultsTo(classOf[StringDecoder].getName) + private val keyDecoderOpt = parser.accepts("key-decoder-class", "if set, used to deserialize the keys. This class should implement org.apache.kafka.tools.api.Decoder trait. Custom jar should be available in kafka/libs directory.") .withOptionalArg() .ofType(classOf[java.lang.String]) - .defaultsTo("kafka.serializer.StringDecoder") + .defaultsTo(classOf[StringDecoder].getName) private val offsetsOpt = parser.accepts("offsets-decoder", "if set, log data will be parsed as offset data from the " + "__consumer_offsets topic.") private val transactionLogOpt = parser.accepts("transaction-log-decoder", "if set, log data will be parsed as " + @@ -628,8 +628,8 @@ object DumpLogSegments { } else if (options.has(clusterMetadataOpt)) { new ClusterMetadataLogMessageParser } else { - val valueDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(valueDecoderOpt), new VerifiableProperties) - val keyDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(keyDecoderOpt), new VerifiableProperties) + val valueDecoder = newDecoder(options.valueOf(valueDecoderOpt)) + val keyDecoder = newDecoder(options.valueOf(keyDecoderOpt)) new DecoderMessageParser(keyDecoder, valueDecoder) } @@ -651,4 +651,42 @@ object DumpLogSegments { def checkArgs(): Unit = CommandLineUtils.checkRequiredArgs(parser, options, filesOpt) } + + /* + * The kafka.serializer.Decoder is deprecated in 3.8.0. This method is used to transfer the deprecated + * decoder to the new org.apache.kafka.tools.api.Decoder. Old decoders have an input VerifiableProperties. + * Remove it in new interface since it's always empty. + */ + private[tools] def newDecoder(className: String): Decoder[_] = { + try { + CoreUtils.createObject[org.apache.kafka.tools.api.Decoder[_]](convertDeprecatedDecoderClass(className)) + } catch { + case _: Exception => + // Old decoders always have an default VerifiableProperties input, because DumpLogSegments didn't provide + // any way to pass custom configs. + val decoder = CoreUtils.createObject[kafka.serializer.Decoder[_]](className, new VerifiableProperties()) + (bytes: Array[Byte]) => decoder.fromBytes(bytes) + } + } + + /* + * Covert deprecated decoder implementation to new decoder class. + */ + private[tools] def convertDeprecatedDecoderClass(className: String): String = { + if (className == "kafka.serializer.StringDecoder") { + println("kafka.serializer.StringDecoder is deprecated. Please use org.apache.kafka.tools.api.StringDecoder instead") + classOf[StringDecoder].getName + } else if (className == "kafka.serializer.LongDecoder") { + println("kafka.serializer.LongDecoder is deprecated. Please use org.apache.kafka.tools.api.LongDecoder instead") + classOf[LongDecoder].getName + } else if (className == "kafka.serializer.IntegerDecoder") { + println("kafka.serializer.IntegerDecoder is deprecated. Please use org.apache.kafka.tools.api.IntegerDecoder instead") + classOf[IntegerDecoder].getName + } else if (className == "kafka.serializer.DefaultDecoder") { + println("kafka.serializer.DefaultDecoder is deprecated. Please use org.apache.kafka.tools.api.DefaultDecoder instead") + classOf[DefaultDecoder].getName + } else { + className + } + } } diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala index de7736973b2b4..b8764f5fae3d0 100644 --- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala @@ -28,7 +28,7 @@ import kafka.log.{LogTestUtils, UnifiedLog} import kafka.raft.{KafkaMetadataLog, MetadataLogConfig} import kafka.server.{BrokerTopicStats, KafkaRaftServer} import kafka.tools.DumpLogSegments.{OffsetsMessageParser, TimeIndexDumpErrors} -import kafka.utils.TestUtils +import kafka.utils.{TestUtils, VerifiableProperties} import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.{Assignment, Subscription} import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.common.{TopicPartition, Uuid} @@ -598,6 +598,26 @@ class DumpLogSegmentsTest { ) } + @Test + def testNewDecoder(): Unit = { + // Decoder translate should pass without exception + DumpLogSegments.newDecoder(classOf[DumpLogSegmentsTest.TestDecoder].getName) + DumpLogSegments.newDecoder(classOf[kafka.serializer.DefaultDecoder].getName) + assertThrows(classOf[Exception], () => DumpLogSegments.newDecoder(classOf[DumpLogSegmentsTest.TestDecoderWithoutVerifiableProperties].getName)) + } + + @Test + def testConvertDeprecatedDecoderClass(): Unit = { + assertEquals(classOf[org.apache.kafka.tools.api.DefaultDecoder].getName, DumpLogSegments.convertDeprecatedDecoderClass( + classOf[kafka.serializer.DefaultDecoder].getName)) + assertEquals(classOf[org.apache.kafka.tools.api.IntegerDecoder].getName, DumpLogSegments.convertDeprecatedDecoderClass( + classOf[kafka.serializer.IntegerDecoder].getName)) + assertEquals(classOf[org.apache.kafka.tools.api.LongDecoder].getName, DumpLogSegments.convertDeprecatedDecoderClass( + classOf[kafka.serializer.LongDecoder].getName)) + assertEquals(classOf[org.apache.kafka.tools.api.StringDecoder].getName, DumpLogSegments.convertDeprecatedDecoderClass( + classOf[kafka.serializer.StringDecoder].getName)) + } + private def readBatchMetadata(lines: util.ListIterator[String]): Option[String] = { while (lines.hasNext) { val line = lines.next() @@ -732,3 +752,13 @@ class DumpLogSegmentsTest { } } } + +object DumpLogSegmentsTest { + class TestDecoder(props: VerifiableProperties) extends kafka.serializer.Decoder[Array[Byte]] { + override def fromBytes(bytes: Array[Byte]): Array[Byte] = bytes + } + + class TestDecoderWithoutVerifiableProperties() extends kafka.serializer.Decoder[Array[Byte]] { + override def fromBytes(bytes: Array[Byte]): Array[Byte] = bytes + } +} \ No newline at end of file diff --git a/tools/tools-api/src/main/java/org/apache/kafka/tools/api/Decoder.java b/tools/tools-api/src/main/java/org/apache/kafka/tools/api/Decoder.java new file mode 100644 index 0000000000000..f3bb9eb47fb58 --- /dev/null +++ b/tools/tools-api/src/main/java/org/apache/kafka/tools/api/Decoder.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.api; + +/** + * A decoder is a method of turning byte arrays into objects. + */ +@FunctionalInterface +public interface Decoder { + T fromBytes(byte[] bytes); +} diff --git a/tools/tools-api/src/main/java/org/apache/kafka/tools/api/DefaultDecoder.java b/tools/tools-api/src/main/java/org/apache/kafka/tools/api/DefaultDecoder.java new file mode 100644 index 0000000000000..e72aaac2e6c95 --- /dev/null +++ b/tools/tools-api/src/main/java/org/apache/kafka/tools/api/DefaultDecoder.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.api; + +/** + * The default implementation does nothing, just returns the same byte array it takes in. + */ +public class DefaultDecoder implements Decoder { + @Override + public byte[] fromBytes(byte[] bytes) { + return bytes; + } +} diff --git a/tools/tools-api/src/main/java/org/apache/kafka/tools/api/IntegerDecoder.java b/tools/tools-api/src/main/java/org/apache/kafka/tools/api/IntegerDecoder.java new file mode 100644 index 0000000000000..7e20140927298 --- /dev/null +++ b/tools/tools-api/src/main/java/org/apache/kafka/tools/api/IntegerDecoder.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.api; + +import java.nio.ByteBuffer; + +/** + * The integer decoder translates bytes into integers. + */ +public class IntegerDecoder implements Decoder { + @Override + public Integer fromBytes(byte[] bytes) { + return ByteBuffer.wrap(bytes).getInt(); + } +} diff --git a/tools/tools-api/src/main/java/org/apache/kafka/tools/api/LongDecoder.java b/tools/tools-api/src/main/java/org/apache/kafka/tools/api/LongDecoder.java new file mode 100644 index 0000000000000..8cdb4bbf6bb36 --- /dev/null +++ b/tools/tools-api/src/main/java/org/apache/kafka/tools/api/LongDecoder.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.api; + +import java.nio.ByteBuffer; + +/** + * The long decoder translates bytes into longs. + */ +public class LongDecoder implements Decoder { + @Override + public Long fromBytes(byte[] bytes) { + return ByteBuffer.wrap(bytes).getLong(); + } +} diff --git a/tools/tools-api/src/main/java/org/apache/kafka/tools/api/StringDecoder.java b/tools/tools-api/src/main/java/org/apache/kafka/tools/api/StringDecoder.java new file mode 100644 index 0000000000000..eb6b8ddff8a63 --- /dev/null +++ b/tools/tools-api/src/main/java/org/apache/kafka/tools/api/StringDecoder.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.api; + +import java.nio.charset.StandardCharsets; + +/** + * The string decoder translates bytes into strings. It uses UTF8 by default. + */ +public class StringDecoder implements Decoder { + @Override + public String fromBytes(byte[] bytes) { + return new String(bytes, StandardCharsets.UTF_8); + } +}