Skip to content

Commit 45aaf80

Browse files
committed
Implement foundational API for parallel-consumer based Kafka processors
Decoupled from #509 This merely adds an API on top of which processors can be implemented. We can migrate processors one-by-one from Kafka Streams to this API. Majority of this work was already done in #509, but got out of date due to changed priorities. At the very least said PR is good to take inspiration from. Relates to DependencyTrack/hyades#346 Relates to DependencyTrack/hyades#901 Relates to DependencyTrack/hyades#907 Signed-off-by: nscuro <[email protected]>
1 parent f906f65 commit 45aaf80

17 files changed

+1100
-0
lines changed

pom.xml

+8
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
<lib.kafka-junit.version>3.6.0</lib.kafka-junit.version>
101101
<lib.micrometer-jvm-extras.version>0.2.2</lib.micrometer-jvm-extras.version>
102102
<lib.packageurl.version>1.4.1</lib.packageurl.version>
103+
<lib.parallel-consumer.version>0.5.2.8</lib.parallel-consumer.version>
103104
<lib.pebble.version>3.2.0</lib.pebble.version>
104105
<lib.protobuf-java.version>3.25.2</lib.protobuf-java.version>
105106
<lib.testcontainers.version>1.18.3</lib.testcontainers.version>
@@ -299,6 +300,13 @@
299300
<artifactId>packageurl-java</artifactId>
300301
<version>${lib.packageurl.version}</version>
301302
</dependency>
303+
304+
<dependency>
305+
<groupId>io.confluent.parallelconsumer</groupId>
306+
<artifactId>parallel-consumer-core</artifactId>
307+
<version>${lib.parallel-consumer.version}</version>
308+
</dependency>
309+
302310
<dependency>
303311
<groupId>org.apache.kafka</groupId>
304312
<artifactId>kafka-clients</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.dependencytrack.common;
2+
3+
/**
4+
* Common fields for use with SLF4J's {@link org.slf4j.MDC}.
5+
*/
6+
public final class MdcKeys {
7+
8+
public static final String MDC_KAFKA_RECORD_TOPIC = "kafkaRecordTopic";
9+
public static final String MDC_KAFKA_RECORD_PARTITION = "kafkaRecordPartition";
10+
public static final String MDC_KAFKA_RECORD_OFFSET = "kafkaRecordOffset";
11+
public static final String MDC_KAFKA_RECORD_KEY = "kafkaRecordKey";
12+
13+
private MdcKeys() {
14+
}
15+
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package org.dependencytrack.event.kafka.processor;
2+
3+
import alpine.common.logging.Logger;
4+
import org.dependencytrack.event.kafka.processor.api.ProcessorManager;
5+
6+
import javax.servlet.ServletContextEvent;
7+
import javax.servlet.ServletContextListener;
8+
9+
public class ProcessorInitializer implements ServletContextListener {
10+
11+
private static final Logger LOGGER = Logger.getLogger(ProcessorInitializer.class);
12+
13+
static final ProcessorManager PROCESSOR_MANAGER = new ProcessorManager();
14+
15+
@Override
16+
public void contextInitialized(final ServletContextEvent event) {
17+
LOGGER.info("Initializing processors");
18+
19+
// TODO: Register processor here!
20+
21+
PROCESSOR_MANAGER.startAll();
22+
}
23+
24+
@Override
25+
public void contextDestroyed(final ServletContextEvent event) {
26+
LOGGER.info("Stopping processors");
27+
PROCESSOR_MANAGER.close();
28+
}
29+
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package org.dependencytrack.event.kafka.processor;
2+
3+
import org.eclipse.microprofile.health.HealthCheck;
4+
import org.eclipse.microprofile.health.HealthCheckResponse;
5+
import org.eclipse.microprofile.health.Liveness;
6+
7+
import static org.dependencytrack.event.kafka.processor.ProcessorInitializer.PROCESSOR_MANAGER;
8+
9+
@Liveness
10+
public class ProcessorsHealthCheck implements HealthCheck {
11+
12+
@Override
13+
public HealthCheckResponse call() {
14+
return PROCESSOR_MANAGER.probeHealth();
15+
}
16+
17+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package org.dependencytrack.event.kafka.processor.api;
2+
3+
import org.apache.commons.lang3.exception.ExceptionUtils;
4+
import org.apache.http.conn.ConnectTimeoutException;
5+
import org.apache.kafka.clients.consumer.ConsumerRecord;
6+
import org.apache.kafka.common.errors.SerializationException;
7+
import org.apache.kafka.common.serialization.Serde;
8+
import org.datanucleus.api.jdo.exceptions.ConnectionInUseException;
9+
import org.datanucleus.store.query.QueryInterruptedException;
10+
import org.dependencytrack.event.kafka.processor.exception.RetryableProcessingException;
11+
import org.postgresql.util.PSQLState;
12+
13+
import javax.jdo.JDOOptimisticVerificationException;
14+
import java.net.SocketTimeoutException;
15+
import java.sql.SQLException;
16+
import java.sql.SQLTransientConnectionException;
17+
import java.sql.SQLTransientException;
18+
import java.util.List;
19+
import java.util.concurrent.TimeoutException;
20+
21+
/**
22+
* An abstract {@link ProcessingStrategy} that provides various shared functionality.
23+
*
24+
* @param <K> Type of the {@link ConsumerRecord} key
25+
* @param <V> Type of the {@link ConsumerRecord} value
26+
*/
27+
abstract class AbstractProcessingStrategy<K, V> implements ProcessingStrategy {
28+
29+
private final Serde<K> keySerde;
30+
private final Serde<V> valueSerde;
31+
32+
AbstractProcessingStrategy(final Serde<K> keySerde, final Serde<V> valueSerde) {
33+
this.keySerde = keySerde;
34+
this.valueSerde = valueSerde;
35+
}
36+
37+
/**
38+
* @param record The {@link ConsumerRecord} to deserialize key and value of
39+
* @return A {@link ConsumerRecord} with deserialized key and value
40+
* @throws SerializationException When deserializing the {@link ConsumerRecord} failed
41+
*/
42+
ConsumerRecord<K, V> deserialize(final ConsumerRecord<byte[], byte[]> record) {
43+
final K deserializedKey;
44+
final V deserializedValue;
45+
try {
46+
deserializedKey = keySerde.deserializer().deserialize(record.topic(), record.key());
47+
deserializedValue = valueSerde.deserializer().deserialize(record.topic(), record.value());
48+
} catch (RuntimeException e) {
49+
if (e instanceof SerializationException) {
50+
throw e;
51+
}
52+
53+
throw new SerializationException(e);
54+
}
55+
56+
return new ConsumerRecord<>(record.topic(), record.partition(), record.offset(),
57+
record.timestamp(), record.timestampType(), record.serializedKeySize(), record.serializedValueSize(),
58+
deserializedKey, deserializedValue, record.headers(), record.leaderEpoch());
59+
}
60+
61+
private static final List<Class<? extends Exception>> KNOWN_TRANSIENT_EXCEPTIONS = List.of(
62+
ConnectTimeoutException.class,
63+
ConnectionInUseException.class,
64+
JDOOptimisticVerificationException.class,
65+
QueryInterruptedException.class,
66+
SocketTimeoutException.class,
67+
SQLTransientException.class,
68+
SQLTransientConnectionException.class,
69+
TimeoutException.class
70+
);
71+
72+
boolean isRetryableException(final Throwable throwable) {
73+
if (throwable instanceof RetryableProcessingException) {
74+
return true;
75+
}
76+
77+
final boolean isKnownTransientException = ExceptionUtils.getThrowableList(throwable).stream()
78+
.anyMatch(cause -> KNOWN_TRANSIENT_EXCEPTIONS.contains(cause.getClass()));
79+
if (isKnownTransientException) {
80+
return true;
81+
}
82+
83+
return ExceptionUtils.getRootCause(throwable) instanceof final SQLException se
84+
&& PSQLState.isConnectionError(se.getSQLState());
85+
}
86+
87+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package org.dependencytrack.event.kafka.processor.api;
2+
3+
import alpine.common.logging.Logger;
4+
import io.confluent.parallelconsumer.PCRetriableException;
5+
import org.apache.kafka.clients.consumer.ConsumerRecord;
6+
import org.apache.kafka.common.errors.SerializationException;
7+
import org.apache.kafka.common.serialization.Serde;
8+
import org.dependencytrack.common.MdcKeys;
9+
import org.dependencytrack.event.kafka.processor.exception.ProcessingException;
10+
import org.slf4j.MDC;
11+
12+
import java.util.ArrayList;
13+
import java.util.List;
14+
15+
/**
16+
* A {@link ProcessingStrategy} that processes records in batches.
17+
*
18+
* @param <K> Type of the {@link ConsumerRecord} key
19+
* @param <V> Type of the {@link ConsumerRecord} value
20+
*/
21+
class BatchProcessingStrategy<K, V> extends AbstractProcessingStrategy<K, V> {
22+
23+
private static final Logger LOGGER = Logger.getLogger(BatchProcessingStrategy.class);
24+
25+
private final BatchProcessor<K, V> batchProcessor;
26+
27+
BatchProcessingStrategy(final BatchProcessor<K, V> batchProcessor,
28+
final Serde<K> keySerde, final Serde<V> valueSerde) {
29+
super(keySerde, valueSerde);
30+
this.batchProcessor = batchProcessor;
31+
}
32+
33+
/**
34+
* {@inheritDoc}
35+
*/
36+
@Override
37+
public void processRecords(final List<ConsumerRecord<byte[], byte[]>> records) {
38+
final var deserializedRecords = new ArrayList<ConsumerRecord<K, V>>(records.size());
39+
for (final ConsumerRecord<byte[], byte[]> record : records) {
40+
try (var ignoredMdcKafkaRecordTopic = MDC.putCloseable(MdcKeys.MDC_KAFKA_RECORD_TOPIC, record.topic());
41+
var ignoredMdcKafkaRecordPartition = MDC.putCloseable(MdcKeys.MDC_KAFKA_RECORD_PARTITION, String.valueOf(record.partition()));
42+
var ignoredMdcKafkaRecordOffset = MDC.putCloseable(MdcKeys.MDC_KAFKA_RECORD_OFFSET, String.valueOf(record.offset()))) {
43+
deserializedRecords.add(deserialize(record));
44+
} catch (SerializationException e) {
45+
// TODO: Consider supporting error handlers, e.g. to send record to DLT.
46+
LOGGER.error("Failed to deserialize record; Skipping", e);
47+
}
48+
}
49+
50+
if (deserializedRecords.isEmpty()) {
51+
LOGGER.warn("All of the %d records in this batch failed to be deserialized".formatted(records.size()));
52+
return;
53+
}
54+
55+
try {
56+
batchProcessor.process(deserializedRecords);
57+
} catch (ProcessingException | RuntimeException e) {
58+
if (isRetryableException(e)) {
59+
LOGGER.warn("Encountered retryable exception while processing %d records".formatted(deserializedRecords.size()), e);
60+
throw new PCRetriableException(e);
61+
}
62+
63+
LOGGER.error("Encountered non-retryable exception while processing %d records; Skipping".formatted(deserializedRecords.size()), e);
64+
// TODO: Consider supporting error handlers, e.g. to send records to DLT.
65+
// Skip records to avoid poison-pill scenario.
66+
}
67+
}
68+
69+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.dependencytrack.event.kafka.processor.api;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRecord;
4+
import org.dependencytrack.event.kafka.processor.exception.ProcessingException;
5+
6+
import java.util.List;
7+
8+
/**
9+
* A processor of {@link ConsumerRecord} batches.
10+
*
11+
* @param <K> Type of the {@link ConsumerRecord} key
12+
* @param <V> Type of the {@link ConsumerRecord} value
13+
*/
14+
public interface BatchProcessor<K, V> {
15+
16+
/**
17+
* Process a batch of {@link ConsumerRecord}s.
18+
* <p>
19+
* This method may be called by multiple threads concurrently and thus MUST be thread safe!
20+
*
21+
* @param records Batch of {@link ConsumerRecord}s to process
22+
* @throws ProcessingException When consuming the batch of {@link ConsumerRecord}s failed
23+
*/
24+
void process(final List<ConsumerRecord<K, V>> records) throws ProcessingException;
25+
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.dependencytrack.event.kafka.processor.api;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRecord;
4+
5+
import java.util.List;
6+
7+
interface ProcessingStrategy {
8+
9+
/**
10+
* Process zero or more {@link ConsumerRecord}s.
11+
*
12+
* @param records The {@link ConsumerRecord}s to process
13+
*/
14+
void processRecords(final List<ConsumerRecord<byte[], byte[]>> records);
15+
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package org.dependencytrack.event.kafka.processor.api;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRecord;
4+
import org.dependencytrack.event.kafka.processor.exception.ProcessingException;
5+
6+
/**
7+
* A processor of individual {@link ConsumerRecord}s.
8+
*
9+
* @param <K> Type of the {@link ConsumerRecord} key
10+
* @param <V> Type of the {@link ConsumerRecord} value
11+
*/
12+
public interface Processor<K, V> {
13+
14+
/**
15+
* Process a {@link ConsumerRecord}.
16+
* <p>
17+
* This method may be called by multiple threads concurrently and thus MUST be thread safe!
18+
*
19+
* @param record The {@link ConsumerRecord} to process
20+
* @throws ProcessingException When processing the {@link ConsumerRecord} failed
21+
*/
22+
void process(final ConsumerRecord<K, V> record) throws ProcessingException;
23+
24+
}

0 commit comments

Comments
 (0)