diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerException.java index edbdd5b51..3826e1f79 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerException.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerException.java @@ -8,6 +8,8 @@ /** * Generic Parallel Consumer {@link RuntimeException} parent. + * + * @author Antony Stubbs */ @StandardException public class ParallelConsumerException extends RuntimeException { diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java index 5d871da7f..ca12e8982 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java @@ -25,6 +25,7 @@ /** * The options for the {@link AbstractParallelEoSStreamProcessor} system. * + * @author Antony Stubbs * @see #builder() * @see ParallelConsumerOptions.ParallelConsumerOptionsBuilder */ diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/BitSetEncoder.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/BitSetEncoder.java index 64e607de1..236b605ab 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/BitSetEncoder.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/BitSetEncoder.java @@ -37,6 +37,7 @@ * @see PartitionState#incompleteOffsets * @see RunLengthEncoder * @see OffsetBitSet + * @author Antony Stubbs */ @Slf4j public class BitSetEncoder extends OffsetEncoder { diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/BitSetEncodingNotSupportedException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/BitSetEncodingNotSupportedException.java index 0a0e1c340..b0b7b98d5 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/BitSetEncodingNotSupportedException.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/BitSetEncodingNotSupportedException.java @@ -1,12 +1,16 @@ package io.confluent.parallelconsumer.offsets; /*- - * Copyright (C) 2020-2021 Confluent, Inc. + * Copyright (C) 2020-2022 Confluent, Inc. */ -public class BitSetEncodingNotSupportedException extends EncodingNotSupportedException { - public BitSetEncodingNotSupportedException(String msg) { - super(msg); - } +import lombok.experimental.StandardException; +/** + * Thrown under situations where the {@link BitSetEncoder} would not be able to encode the given data. + * + * @author Antony Stubbs + */ +@StandardException +public class BitSetEncodingNotSupportedException extends EncodingNotSupportedException { } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/ByteBufferEncoder.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/ByteBufferEncoder.java index 7459443b0..7b8f56351 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/ByteBufferEncoder.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/ByteBufferEncoder.java @@ -9,6 +9,12 @@ import static io.confluent.parallelconsumer.offsets.OffsetEncoding.ByteArray; import static io.confluent.parallelconsumer.offsets.OffsetEncoding.ByteArrayCompressed; +/** + * Encodes offsets into a {@link ByteBuffer}. Doesn't have any advantage over the {@link BitSetEncoder} and + * {@link RunLengthEncoder}, but can be useful for testing and comparison. + * + * @author Antony Stubbs + */ public class ByteBufferEncoder extends OffsetEncoder { private final ByteBuffer bytesBuffer; diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/EncodedOffsetPair.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/EncodedOffsetPair.java index b25774f79..609b90cd2 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/EncodedOffsetPair.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/EncodedOffsetPair.java @@ -23,6 +23,10 @@ import static io.confluent.parallelconsumer.offsets.OffsetSimpleSerialisation.deserialiseByteArrayToBitMapString; /** + * Encapsulates the encoding type, and the actual encoded data, when creating an offset map encoding. Central place for + * decoding the data. + * + * @author Antony Stubbs * @see #unwrap */ @Slf4j @@ -110,7 +114,8 @@ public HighestOffsetAndIncompletes getDecodedIncompletes(long baseOffset) { case BitSetV2Compressed -> deserialiseBitSetWrapToIncompletes(BitSetV2, baseOffset, decompressZstd(data)); case RunLengthV2 -> runLengthDecodeToIncompletes(encoding, baseOffset, data); case RunLengthV2Compressed -> runLengthDecodeToIncompletes(RunLengthV2, baseOffset, decompressZstd(data)); - default -> throw new UnsupportedOperationException("Encoding (" + encoding.description() + ") not supported"); + default -> + throw new UnsupportedOperationException("Encoding (" + encoding.description() + ") not supported"); }; return binaryArrayString; } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/EncodingNotSupportedException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/EncodingNotSupportedException.java index 378f9bd11..58097d6ea 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/EncodingNotSupportedException.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/EncodingNotSupportedException.java @@ -5,12 +5,13 @@ */ import io.confluent.parallelconsumer.internal.InternalException; +import lombok.experimental.StandardException; -/*- - * Copyright (C) 2020-2021 Confluent, Inc. +/** + * Parent of the exceptions for when the {@link OffsetEncoder} cannot encode the given data. + * + * @author Antony Stubbs */ +@StandardException public class EncodingNotSupportedException extends InternalException { - public EncodingNotSupportedException(final String message) { - super(message); - } } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/NoEncodingPossibleException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/NoEncodingPossibleException.java index d7cce4a70..2b33ab767 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/NoEncodingPossibleException.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/NoEncodingPossibleException.java @@ -5,10 +5,13 @@ */ import io.confluent.parallelconsumer.internal.InternalException; +import lombok.experimental.StandardException; +/** + * Throw when for whatever reason, no encoding of the offsets is possible. + * + * @author Antony Stubbs + */ +@StandardException public class NoEncodingPossibleException extends InternalException { - - public NoEncodingPossibleException(String msg) { - super(msg); - } } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetBitSet.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetBitSet.java index 0058a4994..cd846f139 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetBitSet.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetBitSet.java @@ -19,6 +19,7 @@ *
* todo unify or refactor with {@link BitSetEncoder}. Why was it ever seperate? * + * @author Antony Stubbs * @see BitSetEncoder */ @Slf4j diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetDecodingError.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetDecodingError.java index d69d6aec5..2a1a8f17e 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetDecodingError.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetDecodingError.java @@ -5,14 +5,15 @@ */ import io.confluent.parallelconsumer.internal.InternalException; +import lombok.experimental.StandardException; /*- * Error decoding offsets * * TODO should extend java.lang.Error ? + * + * @author Antony Stubbs */ +@StandardException public class OffsetDecodingError extends InternalException { - public OffsetDecodingError(final String s, final IllegalArgumentException a) { - super(s, a); - } } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetEncoder.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetEncoder.java index 76f6ba7ef..6a97cbcee 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetEncoder.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetEncoder.java @@ -11,7 +11,9 @@ import java.nio.ByteBuffer; /** - * Base OffsetEncoder + * Base OffsetEncoder, defining the contract for encoding offset data. + * + * @author Antony Stubbs */ @Slf4j public abstract class OffsetEncoder { diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetMapCodecManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetMapCodecManager.java index e71377a29..cd4010775 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetMapCodecManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetMapCodecManager.java @@ -34,6 +34,8 @@ *
* Have results in an accessible structure, easily selecting the highest compression. * + * @author Antony Stubbs * @see #invoke() */ @Slf4j diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/RunLengthEncoder.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/RunLengthEncoder.java index 9138db719..396159d59 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/RunLengthEncoder.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/RunLengthEncoder.java @@ -20,6 +20,8 @@ *
* One such nature is that gaps between completed offsets get encoded as succeeded offsets. This doesn't matter because * they don't exist and we'll neve see them (they no longer exist in the source partition). + * + * @author Antony Stubbs */ public class RunLengthEncoder extends OffsetEncoder { diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/RunlengthV1EncodingNotSupported.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/RunlengthV1EncodingNotSupported.java index 56360a57c..c333b7743 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/RunlengthV1EncodingNotSupported.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/RunlengthV1EncodingNotSupported.java @@ -1,10 +1,16 @@ package io.confluent.parallelconsumer.offsets; /*- - * Copyright (C) 2020-2021 Confluent, Inc. + * Copyright (C) 2020-2022 Confluent, Inc. */ + +import lombok.experimental.StandardException; + +/** + * Thrown when Runlength V1 encoding is not supported. + * + * @author Antony Stubbs + */ +@StandardException public class RunlengthV1EncodingNotSupported extends EncodingNotSupportedException { - public RunlengthV1EncodingNotSupported(final String msg) { - super(msg); - } } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java index d231130c8..b3e8309e5 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java @@ -32,6 +32,7 @@ /** * Our view of the state of the partitions that we've been assigned. * + * @author Antony Stubbs * @see PartitionStateManager */ // todo class becoming large - possible to extract some functionality? diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java index d8fad68fc..afc9a3e2b 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java @@ -29,6 +29,7 @@ *
* This state is shared between the {@link BrokerPollSystem} thread and the {@link AbstractParallelEoSStreamProcessor}.
*
+ * @author Antony Stubbs
* @see PartitionState
*/
@Slf4j
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java
index de3dbd11c..a72187daf 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java
@@ -24,6 +24,8 @@
/**
* Models the queue of work to be processed, based on the {@link ProcessingOrder} modes.
+ *
+ * @author Antony Stubbs
*/
@Slf4j
@RequiredArgsConstructor
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/RemovedPartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/RemovedPartitionState.java
index 8fd3133f9..bef1475cc 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/RemovedPartitionState.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/RemovedPartitionState.java
@@ -28,6 +28,8 @@
* leak as the collection will forever expand. However, even massive partition counts to a single consumer would be in
* the hundreds of thousands, this would only result in hundreds of thousands of {@link TopicPartition} object keys all
* pointing to the same instance of {@link RemovedPartitionState}.
+ *
+ * @author Antony Stubbs
*/
@Slf4j
public class RemovedPartitionState
- * Low Water Mark - the highest offset (continuously successful) with all it's previous messages succeeded (the offset
+ * Low Watermark - the highest offset (continuously successful) with all it's previous messages succeeded (the offset
* one commits to broker)
*
* High Water Mark - the highest offset which has succeeded (previous may be incomplete)
@@ -32,8 +32,7 @@
*
* This state is shared between the {@link BrokerPollSystem} thread and the {@link AbstractParallelEoSStreamProcessor}.
*
- * @param