Skip to content

Commit

Permalink
feature: Retry count available in header
Browse files Browse the repository at this point in the history
  • Loading branch information
astubbs committed Feb 15, 2022
1 parent f369399 commit 4f03c2a
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 15 deletions.
14 changes: 6 additions & 8 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,8 @@ You can also optionally provide a callback function to be run after the message(
.Usage - print message content out to the console in parallel
[source,java,indent=0]
parallelConsumer.pollAndProduce(record -> {
var result = processBrokerRecord(record);
int failedCount = ParallelConsumerRecord.getFailedCount(record);
var result = processBrokerRecord(record);
return new ProducerRecord<>(outputTopic, record.key(), result.payload);
}, consumeProduceResult -> {
log.debug("Message {} saved to broker at offset {}",
Expand Down Expand Up @@ -691,24 +692,21 @@ The system will treat this as a record processing success, mark the record as co

A user may choose to skip a record for example, if it has been retried too many times or if the record is invalid or doesn't need processing.

The number of failed attempts for a message can be retrevied through the API, see below for an example.

Implementing a https://github.com/confluentinc/parallel-consumer/issues/196[max retries feature] as a part of the system is planned.

.Example of skipping a record after a maximum number of retries is reached
[source,java,indent=0]
----
final int maxRetries = 10;
final Map<ConsumerRecord<String, String>, Long> retriesCount = new ConcurrentHashMap<>();
pc.poll(consumerRecord -> {
Long retryCount = retriesCount.computeIfAbsent(consumerRecord, ignore -> 0L);
int retryCount = ParallelConsumerRecord.getFailedCount(consumerRecord);
if (retryCount < maxRetries) {
processRecord(consumerRecord);
// no exception, so completed - remove from map
retriesCount.remove(consumerRecord);
} else {
log.warn("Retry count {} exceeded max of {} for record {}", retryCount, maxRetries, consumerRecord);
// giving up, remove from map
retriesCount.remove(consumerRecord);
log.warn("Retry count {} exceeded max of {} for record, skipping. {}", retryCount, maxRetries, consumerRecord);
}
});
----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,18 @@

import java.nio.ByteBuffer;

/**
* Utility methods for Consumer Records
*/
public class ParallelConsumerRecord {

/**
* Get the number of failed attempts for a record.
*
* @param record the record
* @return the number of failed attempts
*/
// TODO consider making this information available through a read only ConsumerRecord wrapper that only public API info (as WorkContainer is very internal and writeabe)
public static int getFailedCount(final ConsumerRecord<String, String> record) {
Headers headers = record.headers();
Header header = headers.lastHeader(WorkContainer.FAILED_COUNT_HEADER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,13 @@ void maxRetries() {
ParallelStreamProcessor<String, String> pc = ParallelStreamProcessor.createEosStreamProcessor(null);
// tag::maxRetries[]
final int maxRetries = 10;
final Map<ConsumerRecord<String, String>, Long> retriesCount = new ConcurrentHashMap<>();

pc.poll(consumerRecord -> {
Long retryCount = retriesCount.computeIfAbsent(consumerRecord, ignore -> 0L);
int retryCount = ParallelConsumerRecord.getFailedCount(consumerRecord);
if (retryCount < maxRetries) {
processRecord(consumerRecord);
// no exception, so completed - remove from map
retriesCount.remove(consumerRecord);
} else {
log.warn("Retry count {} exceeded max of {} for record {}", retryCount, maxRetries, consumerRecord);
// giving up, remove from map
retriesCount.remove(consumerRecord);
log.warn("Retry count {} exceeded max of {} for record, skipping. {}", retryCount, maxRetries, consumerRecord);
}
});
// end::maxRetries[]
Expand Down
2 changes: 2 additions & 0 deletions src/docs/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,8 @@ The system will treat this as a record processing success, mark the record as co

A user may choose to skip a record for example, if it has been retried too many times or if the record is invalid or doesn't need processing.

The number of failed attempts for a message can be retrevied through the API, see below for an example.

Implementing a https://github.com/confluentinc/parallel-consumer/issues/196[max retries feature] as a part of the system is planned.

.Example of skipping a record after a maximum number of retries is reached
Expand Down

0 comments on commit 4f03c2a

Please sign in to comment.