From 2ba47a30926217d08d83b5be12c5e79c27a71fc0 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 21 Oct 2022 12:09:50 +0100 Subject: [PATCH] feature: Simple retriable exception to remove error spam from logs (#444) --- CHANGELOG.adoc | 4 +++ README.adoc | 14 ++++++++-- .../PCRetriableException.java | 23 +++++++++++++++ .../parallelconsumer/ParallelConsumer.java | 5 ++-- .../ParallelStreamProcessor.java | 28 +++++++++++++------ .../AbstractParallelEoSStreamProcessor.java | 15 ++++++---- .../LargeVolumeInMemoryTests.java | 5 ++-- .../TransactionMarkersTest.java | 3 +- .../PartitionStateCommittedOffsetIT.java | 3 +- .../FakeRuntimeException.java | 12 ++++---- .../ParallelEoSStreamProcessorTest.java | 4 +-- src/docs/README_TEMPLATE.adoc | 6 +++- 12 files changed, 90 insertions(+), 32 deletions(-) create mode 100644 parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index 7dec603fe..750ac226a 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -16,6 +16,10 @@ endif::[] == 0.5.2.4 +=== Improvements + +* feature: Simple PCRetriableException to remove error spam from logs (#444) + === Fixes * fixes #409: Adds support for compacted topics and commit offset resetting (#425) diff --git a/README.adoc b/README.adoc index 0ae6c91cc..3e3be8822 100644 --- a/README.adoc +++ b/README.adoc @@ -477,8 +477,8 @@ See {issues_link}/12[issue #12], and the `ParallelConsumer` JavaDoc: /** * Asynchronous / concurrent message consumer for Kafka. *

- * Currently there is no direct implementation, only the {@link ParallelStreamProcessor} version (see {@link - * AbstractParallelEoSStreamProcessor}), but there may be in the future. + * Currently, there is no direct implementation, only the {@link ParallelStreamProcessor} version (see + * {@link AbstractParallelEoSStreamProcessor}), but there may be in the future. * * @param key consume / produce key type * @param value consume / produce value type @@ -738,7 +738,11 @@ Ordering guarantees will always be adhered to, regardless of failure. A failure is denoted by *any* exception being thrown from the user's processing function. The system catches these exceptions, logs them and replaces the record in the queue for processing later. All types of Exceptions thrown are considered retriable. -To not retry a record, do not throw an exception from your processing fuction. +To not retry a record, do not throw an exception from your processing function. + +TIP:: To avoid the system logging an error, throw an exception which extends PCRetriableException. + +TIP:: If there was an error processing a record, and you'd like to skip it - do not throw an exception, and the system will mark the record as succeeded. If for some reason you want to proactively fail a record, without relying on some other system throwing an exception which you don't catch - simply throw an exception of your own design, which the system will treat the same way. @@ -1289,6 +1293,10 @@ endif::[] == 0.5.2.4 +=== Improvements + +* feature: Simple PCRetriableException to remove error spam from logs (#444) + === Fixes * fixes #409: Adds support for compacted topics and commit offset resetting (#425) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java new file mode 100644 index 000000000..7fe1a3ad7 --- /dev/null +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java @@ -0,0 +1,23 @@ +package io.confluent.parallelconsumer; + +/*- + * Copyright (C) 2020-2022 Confluent, Inc. + */ + +import lombok.experimental.StandardException; + +/** + * A user's processing function can throw this exception, which signals to PC that processing of the message has failed, + * and that it should be retired at a later time. + *

+ * The advantage of throwing this exception explicitly, is that PC will not log an ERROR. If any other type of exception + * is thrown by the user's function, that will be logged as an error (but will still be retried later). + *

+ * So in short, if this exception is thrown, nothing will be logged (except at DEBUG level), any other exception will be + * logged as an error. + * + * @author Antony Stubbs + */ +@StandardException +public class PCRetriableException extends RuntimeException { +} diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumer.java index 218f77fe8..8116adb69 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumer.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumer.java @@ -14,12 +14,11 @@ import java.util.regex.Pattern; // tag::javadoc[] - /** * Asynchronous / concurrent message consumer for Kafka. *

- * Currently there is no direct implementation, only the {@link ParallelStreamProcessor} version (see {@link - * AbstractParallelEoSStreamProcessor}), but there may be in the future. + * Currently, there is no direct implementation, only the {@link ParallelStreamProcessor} version (see + * {@link AbstractParallelEoSStreamProcessor}), but there may be in the future. * * @param key consume / produce key type * @param value consume / produce value type diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelStreamProcessor.java index 5837631f8..1e63f7784 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelStreamProcessor.java @@ -27,7 +27,9 @@ static ParallelStreamProcessor createEosStreamProcessor(Paralle } /** - * Register a function to be applied in parallel to each received message + * Register a function to be applied in parallel to each received message. + *

+ * Throw a {@link PCRetriableException} to retry the message without the system logging an ERROR level message. * * @param usersVoidConsumptionFunction the function */ @@ -36,8 +38,10 @@ static ParallelStreamProcessor createEosStreamProcessor(Paralle /** - * Register a function to be applied in parallel to each received message, which in turn returns one or more {@link - * ProducerRecord}s to be sent back to the broker. + * Register a function to be applied in parallel to each received message, which in turn returns one or more + * {@link ProducerRecord}s to be sent back to the broker. + *

+ * Throw a {@link PCRetriableException} to retry the message without the system logging an ERROR level message. * * @param callback applied after the produced message is acknowledged by kafka */ @@ -45,20 +49,26 @@ void pollAndProduceMany(Function, List>> Consumer> callback); /** - * Register a function to be applied in parallel to each received message, which in turn returns one or many {@link - * ProducerRecord}s to be sent back to the broker. + * Register a function to be applied in parallel to each received message, which in turn returns one or many + * {@link ProducerRecord}s to be sent back to the broker. + *

+ * Throw a {@link PCRetriableException} to retry the message without the system logging an ERROR level message. */ void pollAndProduceMany(Function, List>> userFunction); /** - * Register a function to be applied in parallel to each received message, which in turn returns a {@link - * ProducerRecord} to be sent back to the broker. + * Register a function to be applied in parallel to each received message, which in turn returns a + * {@link ProducerRecord} to be sent back to the broker. + *

+ * Throw a {@link PCRetriableException} to retry the message without the system logging an ERROR level message. */ void pollAndProduce(Function, ProducerRecord> userFunction); /** - * Register a function to be applied in parallel to each received message, which in turn returns a {@link - * ProducerRecord} to be sent back to the broker. + * Register a function to be applied in parallel to each received message, which in turn returns a + * {@link ProducerRecord} to be sent back to the broker. + *

+ * Throw a {@link PCRetriableException} to retry the message without the system logging an ERROR level message. * * @param callback applied after the produced message is acknowledged by kafka */ diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index c78ad1769..8f3b2bbcb 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -5,10 +5,7 @@ */ import io.confluent.csid.utils.TimeUtils; -import io.confluent.parallelconsumer.ExceptionInUserFunctionException; -import io.confluent.parallelconsumer.ParallelConsumer; -import io.confluent.parallelconsumer.ParallelConsumerOptions; -import io.confluent.parallelconsumer.PollContextInternal; +import io.confluent.parallelconsumer.*; import io.confluent.parallelconsumer.state.WorkContainer; import io.confluent.parallelconsumer.state.WorkManager; import lombok.*; @@ -1174,7 +1171,15 @@ protected List, R>> runUserFunct return intermediateResults; } catch (Exception e) { // handle fail - log.error("Exception caught in user function running stage, registering WC as failed, returning to mailbox. Context: {}", context, e); + var cause = e.getCause(); + String msg = msg("Exception caught in user function running stage, registering WC as failed, returning to" + + " mailbox. Context: {}", context, e); + if (cause instanceof PCRetriableException) { + log.debug("Explicit " + PCRetriableException.class.getSimpleName() + " caught, logging at DEBUG only. " + msg, e); + } else { + log.error(msg, e); + } + for (var wc : workContainerBatch) { wc.onUserFunctionFailure(e); addToMailbox(context, wc); // always add on error diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.java index 903d9c098..9abc4cc46 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.java @@ -6,6 +6,7 @@ import io.confluent.csid.utils.ProgressBarUtils; import io.confluent.csid.utils.ThreadUtils; +import io.confluent.parallelconsumer.FakeRuntimeException; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode; import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase; @@ -257,7 +258,7 @@ private void testTiming(int numberOfKeys, int quantityOfMessagesToProduce) { int i = Integer.parseInt(rec.value()); if (stepIndex != i) { log.error("bad step: {} vs {}", stepIndex, i); - throw new RuntimeException("bad process step, expected message is missing: " + stepIndex + " vs " + i); + throw new FakeRuntimeException("bad process step, expected message is missing: " + stepIndex + " vs " + i); } stepIndex++; } @@ -284,7 +285,7 @@ private void testTiming(int numberOfKeys, int quantityOfMessagesToProduce) { } if (!missing.isEmpty()) log.error("Missing: {}", missing); - throw new RuntimeException("bad step, expected message(s) is missing: " + missing); + throw new FakeRuntimeException("bad step, expected message(s) is missing: " + missing); } assertThat(producerSpy.history()).as("Finally, all messages expected messages were produced").hasSize(quantityOfMessagesToProduce); diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/TransactionMarkersTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/TransactionMarkersTest.java index 0feb10655..fa531181d 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/TransactionMarkersTest.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/TransactionMarkersTest.java @@ -4,6 +4,7 @@ * Copyright (C) 2020-2022 Confluent, Inc. */ +import io.confluent.parallelconsumer.FakeRuntimeException; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelEoSStreamProcessor; import io.confluent.parallelconsumer.offsets.OffsetSimultaneousEncoder; @@ -157,7 +158,7 @@ private void runPcAndBlockRecordsOverLimitIndex(int blockOver) { log.debug(msg("{} over block limit of {}, blocking...", index, blockOver)); Thread.sleep(Long.MAX_VALUE); } catch (InterruptedException e) { - throw new RuntimeException(e); + throw new FakeRuntimeException(e); } } }); diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/state/PartitionStateCommittedOffsetIT.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/state/PartitionStateCommittedOffsetIT.java index f44738f24..eb014f92a 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/state/PartitionStateCommittedOffsetIT.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/state/PartitionStateCommittedOffsetIT.java @@ -7,6 +7,7 @@ import com.google.common.truth.StringSubject; import io.confluent.csid.utils.JavaUtils; import io.confluent.csid.utils.ThreadUtils; +import io.confluent.parallelconsumer.FakeRuntimeException; import io.confluent.parallelconsumer.ManagedTruth; import io.confluent.parallelconsumer.ParallelEoSStreamProcessor; import io.confluent.parallelconsumer.PollContext; @@ -368,7 +369,7 @@ private List> runPcUntilOffset(OffsetResetStrategy o log.debug("Exceptional offset {} succeeded", thisOffset); } else if (thisOffset >= succeedUpToOffset) { log.debug("Failing on {}", thisOffset); - throw new RuntimeException("Failing on " + thisOffset); + throw new FakeRuntimeException("Failing on " + thisOffset); } else { log.debug("Succeeded {}: {}", thisOffset, pollContext.getSingleRecord()); succeededOffsets.add(pollContext); diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/FakeRuntimeException.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/FakeRuntimeException.java index 8f57c385b..2875b691a 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/FakeRuntimeException.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/FakeRuntimeException.java @@ -4,11 +4,13 @@ * Copyright (C) 2020-2022 Confluent, Inc. */ +import lombok.experimental.StandardException; + /** - * Used for testing error handling - easier to identify than a plan exception. + * Used for testing error handling - easier to identify than a plain exception. + * + * @author Antony Stubbs */ -public class FakeRuntimeException extends RuntimeException { - public FakeRuntimeException(String msg) { - super(msg); - } +@StandardException +public class FakeRuntimeException extends PCRetriableException { } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.java index f7c243a7c..c91d4877e 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.java @@ -71,7 +71,7 @@ public void failingActionNothingCommitted(CommitMode commitMode) { setupParallelConsumerInstance(commitMode); parallelConsumer.poll((ignore) -> { - throw new RuntimeException("My user's function error"); + throw new FakeRuntimeException("My user's function error"); }); // let it process @@ -108,7 +108,7 @@ void offsetsAreNeverCommittedForMessagesStillInFlightSimplest(CommitMode commitM // finish processing only msg 1 parallelConsumer.poll(context -> { - log.error("msg: {}", context); + log.debug("msg: {}", context); startBarrierLatch.countDown(); int offset = (int) context.offset(); LatchTestUtils.awaitLatch(locks, offset); diff --git a/src/docs/README_TEMPLATE.adoc b/src/docs/README_TEMPLATE.adoc index 49c18cbfd..a4c509cb2 100644 --- a/src/docs/README_TEMPLATE.adoc +++ b/src/docs/README_TEMPLATE.adoc @@ -641,7 +641,11 @@ Ordering guarantees will always be adhered to, regardless of failure. A failure is denoted by *any* exception being thrown from the user's processing function. The system catches these exceptions, logs them and replaces the record in the queue for processing later. All types of Exceptions thrown are considered retriable. -To not retry a record, do not throw an exception from your processing fuction. +To not retry a record, do not throw an exception from your processing function. + +TIP:: To avoid the system logging an error, throw an exception which extends PCRetriableException. + +TIP:: If there was an error processing a record, and you'd like to skip it - do not throw an exception, and the system will mark the record as succeeded. If for some reason you want to proactively fail a record, without relying on some other system throwing an exception which you don't catch - simply throw an exception of your own design, which the system will treat the same way.