From d413891829dc0749b326ea211d2760c42a070561 Mon Sep 17 00:00:00 2001 From: Frederik Rouleau Date: Tue, 28 May 2024 23:56:47 +0200 Subject: [PATCH 1/2] KAFKA-16507 Add KeyDeserializationException and ValueDeserializationException with record content (#15691) Implements KIP-1036. Add raw ConsumerRecord data to RecordDeserialisationException to make DLQ implementation easier. Reviewers: Kirk True , Andrew Schofield , Matthias J. Sax --- checkstyle/import-control.xml | 5 ++ .../consumer/internals/CompletedFetch.java | 47 ++++++++---- .../RecordDeserializationException.java | 76 ++++++++++++++++++- .../internals/CompletedFetchTest.java | 32 +++++++- 4 files changed, 141 insertions(+), 19 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 9ef9c7f6fe3d7..8e8d2201ae1af 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -207,6 +207,11 @@ + + + + + diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java index 8959345bffdcf..5244af9c827d5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.errors.RecordDeserializationException.DeserializationExceptionOrigin; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -311,25 +312,39 @@ ConsumerRecord parseRecord(Deserializers deserializers, Optional leaderEpoch, TimestampType timestampType, Record record) { + ByteBuffer keyBytes = record.key(); + ByteBuffer valueBytes = record.value(); + Headers headers = new RecordHeaders(record.headers()); + K key; + V value; try { - long offset = record.offset(); - long timestamp = record.timestamp(); - Headers headers = new RecordHeaders(record.headers()); - ByteBuffer keyBytes = record.key(); - K key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes); - ByteBuffer valueBytes = record.value(); - V value = valueBytes == null ? null : deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes); - return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, - timestamp, timestampType, - keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(), - valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(), - key, value, headers, leaderEpoch); + key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes); } catch (RuntimeException e) { - log.error("Deserializers with error: {}", deserializers); - throw new RecordDeserializationException(partition, record.offset(), - "Error deserializing key/value for partition " + partition + - " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e); + log.error("Key Deserializers with error: {}", deserializers); + throw newRecordDeserializationException(DeserializationExceptionOrigin.KEY, partition, timestampType, record, e, headers); } + try { + value = valueBytes == null ? null : deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes); + } catch (RuntimeException e) { + log.error("Value Deserializers with error: {}", deserializers); + throw newRecordDeserializationException(DeserializationExceptionOrigin.VALUE, partition, timestampType, record, e, headers); + } + return new ConsumerRecord<>(partition.topic(), partition.partition(), record.offset(), + record.timestamp(), timestampType, + keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(), + valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(), + key, value, headers, leaderEpoch); + } + + private static RecordDeserializationException newRecordDeserializationException(DeserializationExceptionOrigin origin, + TopicPartition partition, + TimestampType timestampType, + Record record, + RuntimeException e, + Headers headers) { + return new RecordDeserializationException(origin, partition, record.offset(), record.timestamp(), timestampType, record.key(), record.value(), headers, + "Error deserializing " + origin.name() + " for partition " + partition + " at offset " + record.offset() + + ". If needed, please seek past the record to continue consumption.", e); } private Optional maybeLeaderEpoch(int leaderEpoch) { diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java b/clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java index a15df6c7ff52c..aee57c47d28de 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java @@ -16,7 +16,12 @@ */ package org.apache.kafka.common.errors; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.record.TimestampType; + +import java.nio.ByteBuffer; /** * This exception is raised for any error that occurs while deserializing records received by the consumer using @@ -24,14 +29,61 @@ */ public class RecordDeserializationException extends SerializationException { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; + + public enum DeserializationExceptionOrigin { + KEY, + VALUE + } + + private final DeserializationExceptionOrigin origin; private final TopicPartition partition; private final long offset; + private final TimestampType timestampType; + private final long timestamp; + private final ByteBuffer keyBuffer; + private final ByteBuffer valueBuffer; + private final Headers headers; - public RecordDeserializationException(TopicPartition partition, long offset, String message, Throwable cause) { + @Deprecated + public RecordDeserializationException(TopicPartition partition, + long offset, + String message, + Throwable cause) { super(message, cause); + this.origin = null; this.partition = partition; this.offset = offset; + this.timestampType = TimestampType.NO_TIMESTAMP_TYPE; + this.timestamp = ConsumerRecord.NO_TIMESTAMP; + this.keyBuffer = null; + this.valueBuffer = null; + this.headers = null; + } + + public RecordDeserializationException(DeserializationExceptionOrigin origin, + TopicPartition partition, + long offset, + long timestamp, + TimestampType timestampType, + ByteBuffer keyBuffer, + ByteBuffer valueBuffer, + Headers headers, + String message, + Throwable cause) { + super(message, cause); + this.origin = origin; + this.offset = offset; + this.timestampType = timestampType; + this.timestamp = timestamp; + this.partition = partition; + this.keyBuffer = keyBuffer; + this.valueBuffer = valueBuffer; + this.headers = headers; + } + + public DeserializationExceptionOrigin origin() { + return origin; } public TopicPartition topicPartition() { @@ -41,4 +93,24 @@ public TopicPartition topicPartition() { public long offset() { return offset; } + + public TimestampType timestampType() { + return timestampType; + } + + public long timestamp() { + return timestamp; + } + + public ByteBuffer keyBuffer() { + return keyBuffer; + } + + public ByteBuffer valueBuffer() { + return valueBuffer; + } + + public Headers headers() { + return headers; + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java index 2c39f4411febe..43d94deaebb60 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.ApiKeys; @@ -41,6 +43,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.junit.jupiter.api.Test; import java.nio.ByteBuffer; @@ -48,7 +51,9 @@ import java.util.List; import java.util.UUID; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; public class CompletedFetchTest { @@ -161,6 +166,10 @@ public void testCorruptedMessage() { final UUIDSerializer serializer = new UUIDSerializer()) { builder.append(new SimpleRecord(serializer.serialize(TOPIC_NAME, UUID.randomUUID()))); builder.append(0L, "key".getBytes(), "value".getBytes()); + builder.append(new SimpleRecord(serializer.serialize(TOPIC_NAME, UUID.randomUUID()))); + Headers headers = new RecordHeaders(); + headers.add("hkey", "hvalue".getBytes()); + builder.append(10L, serializer.serialize("key", UUID.randomUUID()), "otherValue".getBytes(), headers.toArray()); Records records = builder.build(); FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData() @@ -176,8 +185,29 @@ public void testCorruptedMessage() { completedFetch.fetchRecords(fetchConfig, deserializers, 10); - assertThrows(RecordDeserializationException.class, + RecordDeserializationException thrown = assertThrows(RecordDeserializationException.class, () -> completedFetch.fetchRecords(fetchConfig, deserializers, 10)); + assertEquals(RecordDeserializationException.DeserializationExceptionOrigin.KEY, thrown.origin()); + assertEquals(1, thrown.offset()); + assertEquals(TOPIC_NAME, thrown.topicPartition().topic()); + assertEquals(0, thrown.topicPartition().partition()); + assertEquals(0, thrown.timestamp()); + assertArrayEquals("key".getBytes(), Utils.toNullableArray(thrown.keyBuffer())); + assertArrayEquals("value".getBytes(), Utils.toNullableArray(thrown.valueBuffer())); + assertEquals(0, thrown.headers().toArray().length); + + CompletedFetch completedFetch2 = newCompletedFetch(2, partitionData); + completedFetch2.fetchRecords(fetchConfig, deserializers, 10); + RecordDeserializationException valueThrown = assertThrows(RecordDeserializationException.class, + () -> completedFetch2.fetchRecords(fetchConfig, deserializers, 10)); + assertEquals(RecordDeserializationException.DeserializationExceptionOrigin.VALUE, valueThrown.origin()); + assertEquals(3, valueThrown.offset()); + assertEquals(TOPIC_NAME, valueThrown.topicPartition().topic()); + assertEquals(0, valueThrown.topicPartition().partition()); + assertEquals(10L, valueThrown.timestamp()); + assertNotNull(valueThrown.keyBuffer()); + assertArrayEquals("otherValue".getBytes(), Utils.toNullableArray(valueThrown.valueBuffer())); + assertEquals(headers, valueThrown.headers()); } } } From 662b9e0f9364fe96fed691a5a45598c6e2f2712c Mon Sep 17 00:00:00 2001 From: ShivsundarR Date: Mon, 24 Jun 2024 16:58:01 +0530 Subject: [PATCH 2/2] Updated ShareCompletedFetch --- .../internals/ShareCompletedFetch.java | 47 ++++++++++++------- .../internals/ShareCompletedFetchTest.java | 34 +++++++++++++- 2 files changed, 63 insertions(+), 18 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java index e34ec76a0e32a..712fc6f85bad0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.SerializationException; @@ -286,25 +287,39 @@ ConsumerRecord parseRecord(final Deserializers deserializers, final TimestampType timestampType, final Record record, final short deliveryCount) { + Headers headers = new RecordHeaders(record.headers()); + ByteBuffer keyBytes = record.key(); + ByteBuffer valueBytes = record.value(); + K key; + V value; try { - long offset = record.offset(); - long timestamp = record.timestamp(); - Headers headers = new RecordHeaders(record.headers()); - ByteBuffer keyBytes = record.key(); - K key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes); - ByteBuffer valueBytes = record.value(); - V value = valueBytes == null ? null : deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes); - return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, - timestamp, timestampType, - keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(), - valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(), - key, value, headers, leaderEpoch, Optional.of(deliveryCount)); + key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes); } catch (RuntimeException e) { - log.error("Deserializers with error: {}", deserializers); - throw new RecordDeserializationException(partition.topicPartition(), record.offset(), - "Error deserializing key/value for partition " + partition + - " at offset " + record.offset() + ". The record has been released.", e); + log.error("Key Deserializers with error: {}", deserializers); + throw newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.KEY, partition.topicPartition(), timestampType, record, e, headers); } + try { + value = valueBytes == null ? null : deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes); + } catch (RuntimeException e) { + log.error("Value Deserializers with error: {}", deserializers); + throw newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.VALUE, partition.topicPartition(), timestampType, record, e, headers); + } + return new ConsumerRecord<>(partition.topic(), partition.partition(), record.offset(), + record.timestamp(), timestampType, + keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(), + valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(), + key, value, headers, leaderEpoch, Optional.of(deliveryCount)); + } + + private static RecordDeserializationException newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin origin, + TopicPartition partition, + TimestampType timestampType, + Record record, + RuntimeException e, + Headers headers) { + return new RecordDeserializationException(origin, partition, record.offset(), record.timestamp(), timestampType, record.key(), record.value(), headers, + "Error deserializing " + origin.name() + " for partition " + partition + " at offset " + record.offset() + + ". The record has been released.", e); } private Record nextFetchedRecord(final boolean checkCrcs) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java index 503c4e162fdb4..7523af6e78584 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java @@ -21,6 +21,9 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.message.ShareFetchResponseData; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.ApiKeys; @@ -40,6 +43,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.junit.jupiter.api.Test; import java.nio.ByteBuffer; @@ -50,7 +54,10 @@ import java.util.Set; import java.util.UUID; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; public class ShareCompletedFetchTest { @@ -180,8 +187,10 @@ public void testCorruptedMessage() { try (final MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0); final UUIDSerializer serializer = new UUIDSerializer()) { builder.append(new SimpleRecord(serializer.serialize(TOPIC_NAME, UUID.randomUUID()))); - builder.append(0L, "key1".getBytes(), "value1".getBytes()); - builder.append(0L, "key2".getBytes(), "value2".getBytes()); + builder.append(0L, "key".getBytes(), "value".getBytes()); + Headers headers = new RecordHeaders(); + headers.add("hkey", "hvalue".getBytes()); + builder.append(10L, serializer.serialize("key", UUID.randomUUID()), "otherValue".getBytes(), headers.toArray()); builder.append(new SimpleRecord(serializer.serialize(TOPIC_NAME, UUID.randomUUID()))); Records records = builder.build(); @@ -195,6 +204,7 @@ public void testCorruptedMessage() { // Record 0 is returned by itself because record 1 fails to deserialize ShareInFlightBatch batch = completedFetch.fetchRecords(deserializers, 10, false); + assertNull(batch.getException()); assertEquals(1, batch.getInFlightRecords().size()); assertEquals(0L, batch.getInFlightRecords().get(0).offset()); Acknowledgements acknowledgements = batch.getAcknowledgements(); @@ -202,6 +212,16 @@ public void testCorruptedMessage() { // Record 1 then results in an empty batch batch = completedFetch.fetchRecords(deserializers, 10, false); + assertEquals(RecordDeserializationException.class, batch.getException().getClass()); + RecordDeserializationException thrown = (RecordDeserializationException) batch.getException(); + assertEquals(RecordDeserializationException.DeserializationExceptionOrigin.KEY, thrown.origin()); + assertEquals(1, thrown.offset()); + assertEquals(TOPIC_NAME, thrown.topicPartition().topic()); + assertEquals(0, thrown.topicPartition().partition()); + assertEquals(0, thrown.timestamp()); + assertArrayEquals("key".getBytes(), org.apache.kafka.common.utils.Utils.toNullableArray(thrown.keyBuffer())); + assertArrayEquals("value".getBytes(), Utils.toNullableArray(thrown.valueBuffer())); + assertEquals(0, thrown.headers().toArray().length); assertEquals(0, batch.getInFlightRecords().size()); acknowledgements = batch.getAcknowledgements(); assertEquals(1, acknowledgements.size()); @@ -209,6 +229,15 @@ public void testCorruptedMessage() { // Record 2 then results in an empty batch, because record 1 has now been skipped batch = completedFetch.fetchRecords(deserializers, 10, false); + assertEquals(RecordDeserializationException.class, batch.getException().getClass()); + thrown = (RecordDeserializationException) batch.getException(); + assertEquals(RecordDeserializationException.DeserializationExceptionOrigin.VALUE, thrown.origin()); + assertEquals(2L, thrown.offset()); + assertEquals(TOPIC_NAME, thrown.topicPartition().topic()); + assertEquals(0, thrown.topicPartition().partition()); + assertEquals(10L, thrown.timestamp()); + assertNotNull(thrown.keyBuffer()); + assertArrayEquals("otherValue".getBytes(), Utils.toNullableArray(thrown.valueBuffer())); assertEquals(0, batch.getInFlightRecords().size()); acknowledgements = batch.getAcknowledgements(); assertEquals(1, acknowledgements.size()); @@ -216,6 +245,7 @@ public void testCorruptedMessage() { // Record 3 is returned in the next batch, because record 2 has now been skipped batch = completedFetch.fetchRecords(deserializers, 10, false); + assertNull(batch.getException()); assertEquals(1, batch.getInFlightRecords().size()); assertEquals(3L, batch.getInFlightRecords().get(0).offset()); acknowledgements = batch.getAcknowledgements();