Skip to content

Commit

Permalink
feature: Simple retriable exception to remove error spam from logs (#444
Browse files Browse the repository at this point in the history
)
  • Loading branch information
astubbs authored Oct 21, 2022
1 parent 4057c2d commit 2ba47a3
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 32 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ endif::[]

== 0.5.2.4

=== Improvements

* feature: Simple PCRetriableException to remove error spam from logs (#444)

=== Fixes

* fixes #409: Adds support for compacted topics and commit offset resetting (#425)
Expand Down
14 changes: 11 additions & 3 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,8 @@ See {issues_link}/12[issue #12], and the `ParallelConsumer` JavaDoc:
/**
* Asynchronous / concurrent message consumer for Kafka.
* <p>
* Currently there is no direct implementation, only the {@link ParallelStreamProcessor} version (see {@link
* AbstractParallelEoSStreamProcessor}), but there may be in the future.
* Currently, there is no direct implementation, only the {@link ParallelStreamProcessor} version (see
* {@link AbstractParallelEoSStreamProcessor}), but there may be in the future.
*
* @param <K> key consume / produce key type
* @param <V> value consume / produce value type
Expand Down Expand Up @@ -738,7 +738,11 @@ Ordering guarantees will always be adhered to, regardless of failure.
A failure is denoted by *any* exception being thrown from the user's processing function.
The system catches these exceptions, logs them and replaces the record in the queue for processing later.
All types of Exceptions thrown are considered retriable.
To not retry a record, do not throw an exception from your processing fuction.
To not retry a record, do not throw an exception from your processing function.

TIP:: To avoid the system logging an error, throw an exception which extends PCRetriableException.

TIP:: If there was an error processing a record, and you'd like to skip it - do not throw an exception, and the system will mark the record as succeeded.

If for some reason you want to proactively fail a record, without relying on some other system throwing an exception which you don't catch - simply throw an exception of your own design, which the system will treat the same way.

Expand Down Expand Up @@ -1289,6 +1293,10 @@ endif::[]

== 0.5.2.4

=== Improvements

* feature: Simple PCRetriableException to remove error spam from logs (#444)

=== Fixes

* fixes #409: Adds support for compacted topics and commit offset resetting (#425)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import lombok.experimental.StandardException;

/**
* A user's processing function can throw this exception, which signals to PC that processing of the message has failed,
* and that it should be retired at a later time.
* <p>
* The advantage of throwing this exception explicitly, is that PC will not log an ERROR. If any other type of exception
* is thrown by the user's function, that will be logged as an error (but will still be retried later).
* <p>
* So in short, if this exception is thrown, nothing will be logged (except at DEBUG level), any other exception will be
* logged as an error.
*
* @author Antony Stubbs
*/
@StandardException
public class PCRetriableException extends RuntimeException {
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
import java.util.regex.Pattern;

// tag::javadoc[]

/**
* Asynchronous / concurrent message consumer for Kafka.
* <p>
* Currently there is no direct implementation, only the {@link ParallelStreamProcessor} version (see {@link
* AbstractParallelEoSStreamProcessor}), but there may be in the future.
* Currently, there is no direct implementation, only the {@link ParallelStreamProcessor} version (see
* {@link AbstractParallelEoSStreamProcessor}), but there may be in the future.
*
* @param <K> key consume / produce key type
* @param <V> value consume / produce value type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ static <KK, VV> ParallelStreamProcessor<KK, VV> createEosStreamProcessor(Paralle
}

/**
* Register a function to be applied in parallel to each received message
* Register a function to be applied in parallel to each received message.
* <p>
* Throw a {@link PCRetriableException} to retry the message without the system logging an ERROR level message.
*
* @param usersVoidConsumptionFunction the function
*/
Expand All @@ -36,29 +38,37 @@ static <KK, VV> ParallelStreamProcessor<KK, VV> createEosStreamProcessor(Paralle


/**
* Register a function to be applied in parallel to each received message, which in turn returns one or more {@link
* ProducerRecord}s to be sent back to the broker.
* Register a function to be applied in parallel to each received message, which in turn returns one or more
* {@link ProducerRecord}s to be sent back to the broker.
* <p>
* Throw a {@link PCRetriableException} to retry the message without the system logging an ERROR level message.
*
* @param callback applied after the produced message is acknowledged by kafka
*/
void pollAndProduceMany(Function<PollContext<K, V>, List<ProducerRecord<K, V>>> userFunction,
Consumer<ConsumeProduceResult<K, V, K, V>> callback);

/**
* Register a function to be applied in parallel to each received message, which in turn returns one or many {@link
* ProducerRecord}s to be sent back to the broker.
* Register a function to be applied in parallel to each received message, which in turn returns one or many
* {@link ProducerRecord}s to be sent back to the broker.
* <p>
* Throw a {@link PCRetriableException} to retry the message without the system logging an ERROR level message.
*/
void pollAndProduceMany(Function<PollContext<K, V>, List<ProducerRecord<K, V>>> userFunction);

/**
* Register a function to be applied in parallel to each received message, which in turn returns a {@link
* ProducerRecord} to be sent back to the broker.
* Register a function to be applied in parallel to each received message, which in turn returns a
* {@link ProducerRecord} to be sent back to the broker.
* <p>
* Throw a {@link PCRetriableException} to retry the message without the system logging an ERROR level message.
*/
void pollAndProduce(Function<PollContext<K, V>, ProducerRecord<K, V>> userFunction);

/**
* Register a function to be applied in parallel to each received message, which in turn returns a {@link
* ProducerRecord} to be sent back to the broker.
* Register a function to be applied in parallel to each received message, which in turn returns a
* {@link ProducerRecord} to be sent back to the broker.
* <p>
* Throw a {@link PCRetriableException} to retry the message without the system logging an ERROR level message.
*
* @param callback applied after the produced message is acknowledged by kafka
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
*/

import io.confluent.csid.utils.TimeUtils;
import io.confluent.parallelconsumer.ExceptionInUserFunctionException;
import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.PollContextInternal;
import io.confluent.parallelconsumer.*;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import lombok.*;
Expand Down Expand Up @@ -1174,7 +1171,15 @@ protected <R> List<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> runUserFunct
return intermediateResults;
} catch (Exception e) {
// handle fail
log.error("Exception caught in user function running stage, registering WC as failed, returning to mailbox. Context: {}", context, e);
var cause = e.getCause();
String msg = msg("Exception caught in user function running stage, registering WC as failed, returning to" +
" mailbox. Context: {}", context, e);
if (cause instanceof PCRetriableException) {
log.debug("Explicit " + PCRetriableException.class.getSimpleName() + " caught, logging at DEBUG only. " + msg, e);
} else {
log.error(msg, e);
}

for (var wc : workContainerBatch) {
wc.onUserFunctionFailure(e);
addToMailbox(context, wc); // always add on error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.ThreadUtils;
import io.confluent.parallelconsumer.FakeRuntimeException;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
Expand Down Expand Up @@ -257,7 +258,7 @@ private void testTiming(int numberOfKeys, int quantityOfMessagesToProduce) {
int i = Integer.parseInt(rec.value());
if (stepIndex != i) {
log.error("bad step: {} vs {}", stepIndex, i);
throw new RuntimeException("bad process step, expected message is missing: " + stepIndex + " vs " + i);
throw new FakeRuntimeException("bad process step, expected message is missing: " + stepIndex + " vs " + i);
}
stepIndex++;
}
Expand All @@ -284,7 +285,7 @@ private void testTiming(int numberOfKeys, int quantityOfMessagesToProduce) {
}
if (!missing.isEmpty())
log.error("Missing: {}", missing);
throw new RuntimeException("bad step, expected message(s) is missing: " + missing);
throw new FakeRuntimeException("bad step, expected message(s) is missing: " + missing);
}

assertThat(producerSpy.history()).as("Finally, all messages expected messages were produced").hasSize(quantityOfMessagesToProduce);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.FakeRuntimeException;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.offsets.OffsetSimultaneousEncoder;
Expand Down Expand Up @@ -157,7 +158,7 @@ private void runPcAndBlockRecordsOverLimitIndex(int blockOver) {
log.debug(msg("{} over block limit of {}, blocking...", index, blockOver));
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
throw new RuntimeException(e);
throw new FakeRuntimeException(e);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.google.common.truth.StringSubject;
import io.confluent.csid.utils.JavaUtils;
import io.confluent.csid.utils.ThreadUtils;
import io.confluent.parallelconsumer.FakeRuntimeException;
import io.confluent.parallelconsumer.ManagedTruth;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.PollContext;
Expand Down Expand Up @@ -368,7 +369,7 @@ private List<PollContext<String, String>> runPcUntilOffset(OffsetResetStrategy o
log.debug("Exceptional offset {} succeeded", thisOffset);
} else if (thisOffset >= succeedUpToOffset) {
log.debug("Failing on {}", thisOffset);
throw new RuntimeException("Failing on " + thisOffset);
throw new FakeRuntimeException("Failing on " + thisOffset);
} else {
log.debug("Succeeded {}: {}", thisOffset, pollContext.getSingleRecord());
succeededOffsets.add(pollContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import lombok.experimental.StandardException;

/**
* Used for testing error handling - easier to identify than a plan exception.
* Used for testing error handling - easier to identify than a plain exception.
*
* @author Antony Stubbs
*/
public class FakeRuntimeException extends RuntimeException {
public FakeRuntimeException(String msg) {
super(msg);
}
@StandardException
public class FakeRuntimeException extends PCRetriableException {
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void failingActionNothingCommitted(CommitMode commitMode) {
setupParallelConsumerInstance(commitMode);

parallelConsumer.poll((ignore) -> {
throw new RuntimeException("My user's function error");
throw new FakeRuntimeException("My user's function error");
});

// let it process
Expand Down Expand Up @@ -108,7 +108,7 @@ void offsetsAreNeverCommittedForMessagesStillInFlightSimplest(CommitMode commitM

// finish processing only msg 1
parallelConsumer.poll(context -> {
log.error("msg: {}", context);
log.debug("msg: {}", context);
startBarrierLatch.countDown();
int offset = (int) context.offset();
LatchTestUtils.awaitLatch(locks, offset);
Expand Down
6 changes: 5 additions & 1 deletion src/docs/README_TEMPLATE.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,11 @@ Ordering guarantees will always be adhered to, regardless of failure.
A failure is denoted by *any* exception being thrown from the user's processing function.
The system catches these exceptions, logs them and replaces the record in the queue for processing later.
All types of Exceptions thrown are considered retriable.
To not retry a record, do not throw an exception from your processing fuction.
To not retry a record, do not throw an exception from your processing function.

TIP:: To avoid the system logging an error, throw an exception which extends PCRetriableException.

TIP:: If there was an error processing a record, and you'd like to skip it - do not throw an exception, and the system will mark the record as succeeded.

If for some reason you want to proactively fail a record, without relying on some other system throwing an exception which you don't catch - simply throw an exception of your own design, which the system will treat the same way.

Expand Down

0 comments on commit 2ba47a3

Please sign in to comment.