Skip to content

Commit e6ad31f

Browse files
committed
Migrate RepositoryMetaResultProcessor from Kafka Streams to Parallel Consumer
Depends on #552 Relates to DependencyTrack/hyades#346 Relates to DependencyTrack/hyades#901 Relates to DependencyTrack/hyades#907 Signed-off-by: nscuro <[email protected]>
1 parent 8ff235b commit e6ad31f

File tree

6 files changed

+84
-130
lines changed

6 files changed

+84
-130
lines changed

src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java

+2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ public void contextInitialized(final ServletContextEvent event) {
1919

2020
PROCESSOR_MANAGER.registerProcessor(VulnerabilityMirrorProcessor.PROCESSOR_NAME,
2121
KafkaTopics.NEW_VULNERABILITY, new VulnerabilityMirrorProcessor());
22+
PROCESSOR_MANAGER.registerProcessor(RepositoryMetaResultProcessor.PROCESSOR_NAME,
23+
KafkaTopics.REPO_META_ANALYSIS_RESULT, new RepositoryMetaResultProcessor());
2224

2325
PROCESSOR_MANAGER.startAll();
2426
}

src/main/java/org/dependencytrack/event/kafka/streams/processor/RepositoryMetaResultProcessor.java src/main/java/org/dependencytrack/event/kafka/processor/RepositoryMetaResultProcessor.java

+14-19
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
1-
package org.dependencytrack.event.kafka.streams.processor;
1+
package org.dependencytrack.event.kafka.processor;
22

33
import alpine.common.logging.Logger;
4-
import alpine.common.metrics.Metrics;
54
import com.github.packageurl.MalformedPackageURLException;
65
import com.github.packageurl.PackageURL;
7-
import io.micrometer.core.instrument.Timer;
86
import org.apache.commons.lang3.exception.ExceptionUtils;
9-
import org.apache.kafka.streams.processor.api.Processor;
10-
import org.apache.kafka.streams.processor.api.Record;
7+
import org.apache.kafka.clients.consumer.ConsumerRecord;
8+
import org.dependencytrack.event.kafka.processor.api.Processor;
9+
import org.dependencytrack.event.kafka.processor.exception.ProcessingException;
1110
import org.dependencytrack.model.FetchStatus;
1211
import org.dependencytrack.model.IntegrityMetaComponent;
1312
import org.dependencytrack.model.RepositoryMetaComponent;
@@ -29,16 +28,14 @@
2928
/**
3029
* A {@link Processor} responsible for processing result of component repository meta analyses.
3130
*/
32-
public class RepositoryMetaResultProcessor implements Processor<String, AnalysisResult, Void, Void> {
31+
public class RepositoryMetaResultProcessor implements Processor<String, AnalysisResult> {
32+
33+
static final String PROCESSOR_NAME = "repo.meta.analysis.result";
3334

3435
private static final Logger LOGGER = Logger.getLogger(RepositoryMetaResultProcessor.class);
35-
private static final Timer TIMER = Timer.builder("repo_meta_result_processing")
36-
.description("Time taken to process repository meta analysis results")
37-
.register(Metrics.getRegistry());
3836

3937
@Override
40-
public void process(final Record<String, AnalysisResult> record) {
41-
final Timer.Sample timerSample = Timer.start();
38+
public void process(final ConsumerRecord<String, AnalysisResult> record) throws ProcessingException {
4239
if (!isRecordValid(record)) {
4340
return;
4441
}
@@ -49,13 +46,11 @@ public void process(final Record<String, AnalysisResult> record) {
4946
performIntegrityCheck(integrityMetaComponent, record.value(), qm);
5047
}
5148
} catch (Exception e) {
52-
LOGGER.error("An unexpected error occurred while processing record %s".formatted(record), e);
53-
} finally {
54-
timerSample.stop(TIMER);
49+
throw new ProcessingException(e);
5550
}
5651
}
5752

58-
private IntegrityMetaComponent synchronizeIntegrityMetadata(final QueryManager queryManager, final Record<String, AnalysisResult> record) throws MalformedPackageURLException {
53+
private IntegrityMetaComponent synchronizeIntegrityMetadata(final QueryManager queryManager, final ConsumerRecord<String, AnalysisResult> record) throws MalformedPackageURLException {
5954
final AnalysisResult result = record.value();
6055
PackageURL purl = new PackageURL(result.getComponent().getPurl());
6156
if (result.hasIntegrityMeta()) {
@@ -66,7 +61,7 @@ private IntegrityMetaComponent synchronizeIntegrityMetadata(final QueryManager q
6661
}
6762
}
6863

69-
private void synchronizeRepositoryMetadata(final QueryManager queryManager, final Record<String, AnalysisResult> record) throws Exception {
64+
private void synchronizeRepositoryMetadata(final QueryManager queryManager, final ConsumerRecord<String, AnalysisResult> record) throws Exception {
7065
PersistenceManager pm = queryManager.getPersistenceManager();
7166
final AnalysisResult result = record.value();
7267
PackageURL purl = new PackageURL(result.getComponent().getPurl());
@@ -104,7 +99,7 @@ private void synchronizeRepositoryMetadata(final QueryManager queryManager, fina
10499
}
105100
}
106101

107-
private RepositoryMetaComponent createRepositoryMetaResult(Record<String, AnalysisResult> incomingAnalysisResultRecord, PersistenceManager pm, PackageURL purl) throws Exception {
102+
private RepositoryMetaComponent createRepositoryMetaResult(ConsumerRecord<String, AnalysisResult> incomingAnalysisResultRecord, PersistenceManager pm, PackageURL purl) throws Exception {
108103
final AnalysisResult result = incomingAnalysisResultRecord.value();
109104
if (result.hasLatestVersion()) {
110105
try (final Query<RepositoryMetaComponent> query = pm.newQuery(RepositoryMetaComponent.class)) {
@@ -145,7 +140,7 @@ private RepositoryMetaComponent createRepositoryMetaResult(Record<String, Analys
145140
}
146141
}
147142

148-
private IntegrityMetaComponent synchronizeIntegrityMetaResult(final Record<String, AnalysisResult> incomingAnalysisResultRecord, QueryManager queryManager, PackageURL purl) {
143+
private IntegrityMetaComponent synchronizeIntegrityMetaResult(final ConsumerRecord<String, AnalysisResult> incomingAnalysisResultRecord, QueryManager queryManager, PackageURL purl) {
149144
final AnalysisResult result = incomingAnalysisResultRecord.value();
150145
IntegrityMetaComponent persistentIntegrityMetaComponent = queryManager.getIntegrityMetaComponent(purl.toString());
151146
if (persistentIntegrityMetaComponent != null && persistentIntegrityMetaComponent.getStatus() != null && persistentIntegrityMetaComponent.getStatus().equals(FetchStatus.PROCESSED)) {
@@ -180,7 +175,7 @@ private IntegrityMetaComponent synchronizeIntegrityMetaResult(final Record<Strin
180175
return queryManager.updateIntegrityMetaComponent(persistentIntegrityMetaComponent);
181176
}
182177

183-
private static boolean isRecordValid(final Record<String, AnalysisResult> record) {
178+
private static boolean isRecordValid(final ConsumerRecord<String, AnalysisResult> record) {
184179
final AnalysisResult result = record.value();
185180
if (!result.hasComponent()) {
186181
LOGGER.warn("""

src/main/java/org/dependencytrack/event/kafka/streams/KafkaStreamsTopologyFactory.java

-7
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.dependencytrack.event.ProjectPolicyEvaluationEvent;
2424
import org.dependencytrack.event.kafka.KafkaTopics;
2525
import org.dependencytrack.event.kafka.streams.processor.DelayedBomProcessedNotificationProcessor;
26-
import org.dependencytrack.event.kafka.streams.processor.RepositoryMetaResultProcessor;
2726
import org.dependencytrack.event.kafka.streams.processor.VulnerabilityScanResultProcessor;
2827
import org.dependencytrack.model.VulnerabilityScan;
2928
import org.dependencytrack.model.WorkflowState;
@@ -217,12 +216,6 @@ Topology createTopology() {
217216
Event.dispatch(policyEvaluationEvent);
218217
}, Named.as("trigger_policy_evaluation"));
219218

220-
streamsBuilder
221-
.stream(KafkaTopics.REPO_META_ANALYSIS_RESULT.name(),
222-
Consumed.with(KafkaTopics.REPO_META_ANALYSIS_RESULT.keySerde(), KafkaTopics.REPO_META_ANALYSIS_RESULT.valueSerde())
223-
.withName("consume_from_%s_topic".formatted(KafkaTopics.REPO_META_ANALYSIS_RESULT.name())))
224-
.process(RepositoryMetaResultProcessor::new, Named.as("process_repo_meta_analysis_result"));
225-
226219
return streamsBuilder.build(streamsProperties);
227220
}
228221

src/main/resources/application.properties

+12
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,18 @@ alpine.kafka.processor.vuln.mirror.retry.max.delay.ms=180000
504504
alpine.kafka.processor.vuln.mirror.consumer.group.id=dtrack-apiserver-processor
505505
alpine.kafka.processor.vuln.mirror.consumer.auto.offset.reset=earliest
506506

507+
# Required
508+
# Configures the Kafka processor responsible for ingesting repository metadata
509+
# analysis results from the dtrack.repo-meta-analysis.result topic.
510+
alpine.kafka.processor.repo.meta.analysis.result.max.concurrency=-1
511+
alpine.kafka.processor.repo.meta.analysis.result.processing.order=key
512+
alpine.kafka.processor.repo.meta.analysis.result.retry.initial.delay.ms=1000
513+
alpine.kafka.processor.repo.meta.analysis.result.retry.multiplier=2
514+
alpine.kafka.processor.repo.meta.analysis.result.retry.randomization.factor=0.3
515+
alpine.kafka.processor.repo.meta.analysis.result.retry.max.delay.ms=180000
516+
alpine.kafka.processor.repo.meta.analysis.result.consumer.group.id=dtrack-apiserver-processor
517+
alpine.kafka.processor.repo.meta.analysis.result.consumer.auto.offset.reset=earliest
518+
507519
# Scheduling tasks after 3 minutes (3*60*1000) of starting application
508520
task.scheduler.initial.delay=180000
509521

0 commit comments

Comments
 (0)