Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AKCORE-171 : Updated client with changes from KAFKA-16507 #1326

Merged
merged 3 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@
<allow pkg="org.apache.kafka.common" />
</subpackage>

<subpackage name="errors">
<allow class="org.apache.kafka.common.header.Headers" />
<allow class="org.apache.kafka.common.record.TimestampType" />
</subpackage>

</subpackage>

<subpackage name="clients">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.RecordDeserializationException.DeserializationExceptionOrigin;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
Expand Down Expand Up @@ -311,25 +312,39 @@ <K, V> ConsumerRecord<K, V> parseRecord(Deserializers<K, V> deserializers,
Optional<Integer> leaderEpoch,
TimestampType timestampType,
Record record) {
ByteBuffer keyBytes = record.key();
ByteBuffer valueBytes = record.value();
Headers headers = new RecordHeaders(record.headers());
K key;
V value;
try {
long offset = record.offset();
long timestamp = record.timestamp();
Headers headers = new RecordHeaders(record.headers());
ByteBuffer keyBytes = record.key();
K key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
ByteBuffer valueBytes = record.value();
V value = valueBytes == null ? null : deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
timestamp, timestampType,
keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(),
valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(),
key, value, headers, leaderEpoch);
key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
} catch (RuntimeException e) {
log.error("Deserializers with error: {}", deserializers);
throw new RecordDeserializationException(partition, record.offset(),
"Error deserializing key/value for partition " + partition +
" at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e);
log.error("Key Deserializers with error: {}", deserializers);
throw newRecordDeserializationException(DeserializationExceptionOrigin.KEY, partition, timestampType, record, e, headers);
}
try {
value = valueBytes == null ? null : deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
} catch (RuntimeException e) {
log.error("Value Deserializers with error: {}", deserializers);
throw newRecordDeserializationException(DeserializationExceptionOrigin.VALUE, partition, timestampType, record, e, headers);
}
return new ConsumerRecord<>(partition.topic(), partition.partition(), record.offset(),
record.timestamp(), timestampType,
keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(),
valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(),
key, value, headers, leaderEpoch);
}

private static RecordDeserializationException newRecordDeserializationException(DeserializationExceptionOrigin origin,
TopicPartition partition,
TimestampType timestampType,
Record record,
RuntimeException e,
Headers headers) {
return new RecordDeserializationException(origin, partition, record.offset(), record.timestamp(), timestampType, record.key(), record.value(), headers,
"Error deserializing " + origin.name() + " for partition " + partition + " at offset " + record.offset()
+ ". If needed, please seek past the record to continue consumption.", e);
}

private Optional<Integer> maybeLeaderEpoch(int leaderEpoch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.SerializationException;
Expand Down Expand Up @@ -286,25 +287,39 @@ <K, V> ConsumerRecord<K, V> parseRecord(final Deserializers<K, V> deserializers,
final TimestampType timestampType,
final Record record,
final short deliveryCount) {
Headers headers = new RecordHeaders(record.headers());
ByteBuffer keyBytes = record.key();
ByteBuffer valueBytes = record.value();
K key;
V value;
try {
long offset = record.offset();
long timestamp = record.timestamp();
Headers headers = new RecordHeaders(record.headers());
ByteBuffer keyBytes = record.key();
K key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
ByteBuffer valueBytes = record.value();
V value = valueBytes == null ? null : deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
timestamp, timestampType,
keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(),
valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(),
key, value, headers, leaderEpoch, Optional.of(deliveryCount));
key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
} catch (RuntimeException e) {
log.error("Deserializers with error: {}", deserializers);
throw new RecordDeserializationException(partition.topicPartition(), record.offset(),
"Error deserializing key/value for partition " + partition +
" at offset " + record.offset() + ". The record has been released.", e);
log.error("Key Deserializers with error: {}", deserializers);
throw newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.KEY, partition.topicPartition(), timestampType, record, e, headers);
}
try {
value = valueBytes == null ? null : deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
} catch (RuntimeException e) {
log.error("Value Deserializers with error: {}", deserializers);
throw newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.VALUE, partition.topicPartition(), timestampType, record, e, headers);
}
return new ConsumerRecord<>(partition.topic(), partition.partition(), record.offset(),
record.timestamp(), timestampType,
keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(),
valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(),
key, value, headers, leaderEpoch, Optional.of(deliveryCount));
}

private static RecordDeserializationException newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin origin,
TopicPartition partition,
TimestampType timestampType,
Record record,
RuntimeException e,
Headers headers) {
return new RecordDeserializationException(origin, partition, record.offset(), record.timestamp(), timestampType, record.key(), record.value(), headers,
"Error deserializing " + origin.name() + " for partition " + partition + " at offset " + record.offset()
+ ". The record has been released.", e);
}

private Record nextFetchedRecord(final boolean checkCrcs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,74 @@
*/
package org.apache.kafka.common.errors;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;

import java.nio.ByteBuffer;

/**
* This exception is raised for any error that occurs while deserializing records received by the consumer using
* the configured {@link org.apache.kafka.common.serialization.Deserializer}.
*/
public class RecordDeserializationException extends SerializationException {

private static final long serialVersionUID = 1L;
private static final long serialVersionUID = 2L;

public enum DeserializationExceptionOrigin {
KEY,
VALUE
}

private final DeserializationExceptionOrigin origin;
private final TopicPartition partition;
private final long offset;
private final TimestampType timestampType;
private final long timestamp;
private final ByteBuffer keyBuffer;
private final ByteBuffer valueBuffer;
private final Headers headers;

public RecordDeserializationException(TopicPartition partition, long offset, String message, Throwable cause) {
@Deprecated
public RecordDeserializationException(TopicPartition partition,
long offset,
String message,
Throwable cause) {
super(message, cause);
this.origin = null;
this.partition = partition;
this.offset = offset;
this.timestampType = TimestampType.NO_TIMESTAMP_TYPE;
this.timestamp = ConsumerRecord.NO_TIMESTAMP;
this.keyBuffer = null;
this.valueBuffer = null;
this.headers = null;
}

public RecordDeserializationException(DeserializationExceptionOrigin origin,
TopicPartition partition,
long offset,
long timestamp,
TimestampType timestampType,
ByteBuffer keyBuffer,
ByteBuffer valueBuffer,
Headers headers,
String message,
Throwable cause) {
super(message, cause);
this.origin = origin;
this.offset = offset;
this.timestampType = timestampType;
this.timestamp = timestamp;
this.partition = partition;
this.keyBuffer = keyBuffer;
this.valueBuffer = valueBuffer;
this.headers = headers;
}

public DeserializationExceptionOrigin origin() {
return origin;
}

public TopicPartition topicPartition() {
Expand All @@ -41,4 +93,24 @@ public TopicPartition topicPartition() {
public long offset() {
return offset;
}

public TimestampType timestampType() {
return timestampType;
}

public long timestamp() {
return timestamp;
}

public ByteBuffer keyBuffer() {
return keyBuffer;
}

public ByteBuffer valueBuffer() {
return valueBuffer;
}

public Headers headers() {
return headers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -41,14 +43,17 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class CompletedFetchTest {
Expand Down Expand Up @@ -161,6 +166,10 @@ public void testCorruptedMessage() {
final UUIDSerializer serializer = new UUIDSerializer()) {
builder.append(new SimpleRecord(serializer.serialize(TOPIC_NAME, UUID.randomUUID())));
builder.append(0L, "key".getBytes(), "value".getBytes());
builder.append(new SimpleRecord(serializer.serialize(TOPIC_NAME, UUID.randomUUID())));
Headers headers = new RecordHeaders();
headers.add("hkey", "hvalue".getBytes());
builder.append(10L, serializer.serialize("key", UUID.randomUUID()), "otherValue".getBytes(), headers.toArray());
Records records = builder.build();

FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData()
Expand All @@ -176,8 +185,29 @@ public void testCorruptedMessage() {

completedFetch.fetchRecords(fetchConfig, deserializers, 10);

assertThrows(RecordDeserializationException.class,
RecordDeserializationException thrown = assertThrows(RecordDeserializationException.class,
() -> completedFetch.fetchRecords(fetchConfig, deserializers, 10));
assertEquals(RecordDeserializationException.DeserializationExceptionOrigin.KEY, thrown.origin());
assertEquals(1, thrown.offset());
assertEquals(TOPIC_NAME, thrown.topicPartition().topic());
assertEquals(0, thrown.topicPartition().partition());
assertEquals(0, thrown.timestamp());
assertArrayEquals("key".getBytes(), Utils.toNullableArray(thrown.keyBuffer()));
assertArrayEquals("value".getBytes(), Utils.toNullableArray(thrown.valueBuffer()));
assertEquals(0, thrown.headers().toArray().length);

CompletedFetch completedFetch2 = newCompletedFetch(2, partitionData);
completedFetch2.fetchRecords(fetchConfig, deserializers, 10);
RecordDeserializationException valueThrown = assertThrows(RecordDeserializationException.class,
() -> completedFetch2.fetchRecords(fetchConfig, deserializers, 10));
assertEquals(RecordDeserializationException.DeserializationExceptionOrigin.VALUE, valueThrown.origin());
assertEquals(3, valueThrown.offset());
assertEquals(TOPIC_NAME, valueThrown.topicPartition().topic());
assertEquals(0, valueThrown.topicPartition().partition());
assertEquals(10L, valueThrown.timestamp());
assertNotNull(valueThrown.keyBuffer());
assertArrayEquals("otherValue".getBytes(), Utils.toNullableArray(valueThrown.valueBuffer()));
assertEquals(headers, valueThrown.headers());
}
}
}
Expand Down
Loading