Skip to content

Commit 3e655f7

Browse files
committed
Migrate RepositoryMetaResultProcessor from Kafka Streams to Parallel Consumer
Depends on #552 Relates to DependencyTrack/hyades#346 Relates to DependencyTrack/hyades#901 Relates to DependencyTrack/hyades#907 Signed-off-by: nscuro <[email protected]>
1 parent 55f9850 commit 3e655f7

File tree

6 files changed

+81
-131
lines changed

6 files changed

+81
-131
lines changed

src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.dependencytrack.event.kafka.processor;
22

33
import alpine.common.logging.Logger;
4+
import org.dependencytrack.event.kafka.KafkaTopics;
45
import org.dependencytrack.event.kafka.processor.api.ProcessorManager;
56

67
import javax.servlet.ServletContextEvent;
@@ -16,7 +17,8 @@ public class ProcessorInitializer implements ServletContextListener {
1617
public void contextInitialized(final ServletContextEvent event) {
1718
LOGGER.info("Initializing processors");
1819

19-
// TODO: Register processor here!
20+
PROCESSOR_MANAGER.registerProcessor(RepositoryMetaResultProcessor.PROCESSOR_NAME,
21+
KafkaTopics.REPO_META_ANALYSIS_RESULT, new RepositoryMetaResultProcessor());
2022

2123
PROCESSOR_MANAGER.startAll();
2224
}

src/main/java/org/dependencytrack/event/kafka/streams/processor/RepositoryMetaResultProcessor.java src/main/java/org/dependencytrack/event/kafka/processor/RepositoryMetaResultProcessor.java

+14-19
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
1-
package org.dependencytrack.event.kafka.streams.processor;
1+
package org.dependencytrack.event.kafka.processor;
22

33
import alpine.common.logging.Logger;
4-
import alpine.common.metrics.Metrics;
54
import com.github.packageurl.MalformedPackageURLException;
65
import com.github.packageurl.PackageURL;
7-
import io.micrometer.core.instrument.Timer;
86
import org.apache.commons.lang3.exception.ExceptionUtils;
9-
import org.apache.kafka.streams.processor.api.Processor;
10-
import org.apache.kafka.streams.processor.api.Record;
7+
import org.apache.kafka.clients.consumer.ConsumerRecord;
8+
import org.dependencytrack.event.kafka.processor.api.Processor;
9+
import org.dependencytrack.event.kafka.processor.exception.ProcessingException;
1110
import org.dependencytrack.model.FetchStatus;
1211
import org.dependencytrack.model.IntegrityMetaComponent;
1312
import org.dependencytrack.model.RepositoryMetaComponent;
@@ -29,16 +28,14 @@
2928
/**
3029
* A {@link Processor} responsible for processing result of component repository meta analyses.
3130
*/
32-
public class RepositoryMetaResultProcessor implements Processor<String, AnalysisResult, Void, Void> {
31+
public class RepositoryMetaResultProcessor implements Processor<String, AnalysisResult> {
32+
33+
static final String PROCESSOR_NAME = "repo.meta.analysis.result";
3334

3435
private static final Logger LOGGER = Logger.getLogger(RepositoryMetaResultProcessor.class);
35-
private static final Timer TIMER = Timer.builder("repo_meta_result_processing")
36-
.description("Time taken to process repository meta analysis results")
37-
.register(Metrics.getRegistry());
3836

3937
@Override
40-
public void process(final Record<String, AnalysisResult> record) {
41-
final Timer.Sample timerSample = Timer.start();
38+
public void process(final ConsumerRecord<String, AnalysisResult> record) throws ProcessingException {
4239
if (!isRecordValid(record)) {
4340
return;
4441
}
@@ -49,13 +46,11 @@ public void process(final Record<String, AnalysisResult> record) {
4946
performIntegrityCheck(integrityMetaComponent, record.value(), qm);
5047
}
5148
} catch (Exception e) {
52-
LOGGER.error("An unexpected error occurred while processing record %s".formatted(record), e);
53-
} finally {
54-
timerSample.stop(TIMER);
49+
throw new ProcessingException(e);
5550
}
5651
}
5752

58-
private IntegrityMetaComponent synchronizeIntegrityMetadata(final QueryManager queryManager, final Record<String, AnalysisResult> record) throws MalformedPackageURLException {
53+
private IntegrityMetaComponent synchronizeIntegrityMetadata(final QueryManager queryManager, final ConsumerRecord<String, AnalysisResult> record) throws MalformedPackageURLException {
5954
final AnalysisResult result = record.value();
6055
PackageURL purl = new PackageURL(result.getComponent().getPurl());
6156
if (result.hasIntegrityMeta()) {
@@ -66,7 +61,7 @@ private IntegrityMetaComponent synchronizeIntegrityMetadata(final QueryManager q
6661
}
6762
}
6863

69-
private void synchronizeRepositoryMetadata(final QueryManager queryManager, final Record<String, AnalysisResult> record) throws Exception {
64+
private void synchronizeRepositoryMetadata(final QueryManager queryManager, final ConsumerRecord<String, AnalysisResult> record) throws Exception {
7065
PersistenceManager pm = queryManager.getPersistenceManager();
7166
final AnalysisResult result = record.value();
7267
PackageURL purl = new PackageURL(result.getComponent().getPurl());
@@ -104,7 +99,7 @@ private void synchronizeRepositoryMetadata(final QueryManager queryManager, fina
10499
}
105100
}
106101

107-
private RepositoryMetaComponent createRepositoryMetaResult(Record<String, AnalysisResult> incomingAnalysisResultRecord, PersistenceManager pm, PackageURL purl) throws Exception {
102+
private RepositoryMetaComponent createRepositoryMetaResult(ConsumerRecord<String, AnalysisResult> incomingAnalysisResultRecord, PersistenceManager pm, PackageURL purl) throws Exception {
108103
final AnalysisResult result = incomingAnalysisResultRecord.value();
109104
if (result.hasLatestVersion()) {
110105
try (final Query<RepositoryMetaComponent> query = pm.newQuery(RepositoryMetaComponent.class)) {
@@ -145,7 +140,7 @@ private RepositoryMetaComponent createRepositoryMetaResult(Record<String, Analys
145140
}
146141
}
147142

148-
private IntegrityMetaComponent synchronizeIntegrityMetaResult(final Record<String, AnalysisResult> incomingAnalysisResultRecord, QueryManager queryManager, PackageURL purl) {
143+
private IntegrityMetaComponent synchronizeIntegrityMetaResult(final ConsumerRecord<String, AnalysisResult> incomingAnalysisResultRecord, QueryManager queryManager, PackageURL purl) {
149144
final AnalysisResult result = incomingAnalysisResultRecord.value();
150145
IntegrityMetaComponent persistentIntegrityMetaComponent = queryManager.getIntegrityMetaComponent(purl.toString());
151146
if (persistentIntegrityMetaComponent != null && persistentIntegrityMetaComponent.getStatus() != null && persistentIntegrityMetaComponent.getStatus().equals(FetchStatus.PROCESSED)) {
@@ -180,7 +175,7 @@ private IntegrityMetaComponent synchronizeIntegrityMetaResult(final Record<Strin
180175
return queryManager.updateIntegrityMetaComponent(persistentIntegrityMetaComponent);
181176
}
182177

183-
private static boolean isRecordValid(final Record<String, AnalysisResult> record) {
178+
private static boolean isRecordValid(final ConsumerRecord<String, AnalysisResult> record) {
184179
final AnalysisResult result = record.value();
185180
if (!result.hasComponent()) {
186181
LOGGER.warn("""

src/main/java/org/dependencytrack/event/kafka/streams/KafkaStreamsTopologyFactory.java

-7
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.dependencytrack.event.kafka.KafkaTopics;
2525
import org.dependencytrack.event.kafka.streams.processor.DelayedBomProcessedNotificationProcessor;
2626
import org.dependencytrack.event.kafka.streams.processor.MirrorVulnerabilityProcessor;
27-
import org.dependencytrack.event.kafka.streams.processor.RepositoryMetaResultProcessor;
2827
import org.dependencytrack.event.kafka.streams.processor.VulnerabilityScanResultProcessor;
2928
import org.dependencytrack.model.VulnerabilityScan;
3029
import org.dependencytrack.model.WorkflowState;
@@ -218,12 +217,6 @@ Topology createTopology() {
218217
Event.dispatch(policyEvaluationEvent);
219218
}, Named.as("trigger_policy_evaluation"));
220219

221-
streamsBuilder
222-
.stream(KafkaTopics.REPO_META_ANALYSIS_RESULT.name(),
223-
Consumed.with(KafkaTopics.REPO_META_ANALYSIS_RESULT.keySerde(), KafkaTopics.REPO_META_ANALYSIS_RESULT.valueSerde())
224-
.withName("consume_from_%s_topic".formatted(KafkaTopics.REPO_META_ANALYSIS_RESULT.name())))
225-
.process(RepositoryMetaResultProcessor::new, Named.as("process_repo_meta_analysis_result"));
226-
227220
streamsBuilder
228221
.stream(KafkaTopics.NEW_VULNERABILITY.name(),
229222
Consumed.with(KafkaTopics.NEW_VULNERABILITY.keySerde(), KafkaTopics.NEW_VULNERABILITY.valueSerde())

src/main/resources/application.properties

+8
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,14 @@ kafka.streams.transient.processing.exception.threshold.interval=PT30M
491491
# Refer to https://kafka.apache.org/documentation/#consumerconfigs for available options.
492492
# alpine.kafka.processor.<name>.consumer.<consumer.config.name>=
493493

494+
alpine.kafka.processor.repo.meta.analysis.result.max.concurrency=-1
495+
alpine.kafka.processor.repo.meta.analysis.result.processing.order=key
496+
alpine.kafka.processor.repo.meta.analysis.result.retry.initial.delay.ms=1000
497+
alpine.kafka.processor.repo.meta.analysis.result.retry.multiplier=2
498+
alpine.kafka.processor.repo.meta.analysis.result.retry.randomization.factor=0.3
499+
alpine.kafka.processor.repo.meta.analysis.result.retry.max.delay.ms=180000
500+
alpine.kafka.processor.repo.meta.analysis.result.consumer.group.id=dtrack-apiserver-processor
501+
494502
# Scheduling tasks after 3 minutes (3*60*1000) of starting application
495503
task.scheduler.initial.delay=180000
496504

0 commit comments

Comments
 (0)