Skip to content

Commit 8ff235b

Browse files
authored
Merge pull request #553 from DependencyTrack/issue-907-2
Migrate `MirrorVulnerabilityProcessor` from Kafka Streams to Parallel Consumer
2 parents 570e0cc + 4aa68ef commit 8ff235b

File tree

5 files changed

+72
-80
lines changed

5 files changed

+72
-80
lines changed

src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.dependencytrack.event.kafka.processor;
22

33
import alpine.common.logging.Logger;
4+
import org.dependencytrack.event.kafka.KafkaTopics;
45
import org.dependencytrack.event.kafka.processor.api.ProcessorManager;
56

67
import javax.servlet.ServletContextEvent;
@@ -16,7 +17,8 @@ public class ProcessorInitializer implements ServletContextListener {
1617
public void contextInitialized(final ServletContextEvent event) {
1718
LOGGER.info("Initializing processors");
1819

19-
// TODO: Register processor here!
20+
PROCESSOR_MANAGER.registerProcessor(VulnerabilityMirrorProcessor.PROCESSOR_NAME,
21+
KafkaTopics.NEW_VULNERABILITY, new VulnerabilityMirrorProcessor());
2022

2123
PROCESSOR_MANAGER.startAll();
2224
}

src/main/java/org/dependencytrack/event/kafka/streams/processor/MirrorVulnerabilityProcessor.java src/main/java/org/dependencytrack/event/kafka/processor/VulnerabilityMirrorProcessor.java

+15-23
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
1-
package org.dependencytrack.event.kafka.streams.processor;
1+
package org.dependencytrack.event.kafka.processor;
22

33
import alpine.common.logging.Logger;
4-
import alpine.common.metrics.Metrics;
54
import com.github.packageurl.MalformedPackageURLException;
65
import com.github.packageurl.PackageURL;
7-
import io.micrometer.core.instrument.Timer;
86
import org.apache.commons.lang3.StringUtils;
9-
import org.apache.kafka.streams.processor.api.Processor;
10-
import org.apache.kafka.streams.processor.api.Record;
7+
import org.apache.kafka.clients.consumer.ConsumerRecord;
118
import org.cyclonedx.proto.v1_4.Bom;
129
import org.cyclonedx.proto.v1_4.Component;
1310
import org.cyclonedx.proto.v1_4.VulnerabilityAffects;
11+
import org.dependencytrack.event.kafka.processor.api.Processor;
1412
import org.dependencytrack.model.Vulnerability;
1513
import org.dependencytrack.model.VulnerableSoftware;
1614
import org.dependencytrack.parser.dependencytrack.ModelConverterCdxToVuln;
@@ -27,19 +25,18 @@
2725
import java.util.List;
2826
import java.util.Optional;
2927

28+
/**
29+
* A {@link Processor} that ingests vulnerability data from CycloneDX Bill of Vulnerabilities.
30+
*/
31+
public class VulnerabilityMirrorProcessor implements Processor<String, Bom> {
3032

31-
public class MirrorVulnerabilityProcessor implements Processor<String, Bom, Void, Void> {
33+
static final String PROCESSOR_NAME = "vuln.mirror";
3234

33-
private static final Logger LOGGER = Logger.getLogger(MirrorVulnerabilityProcessor.class);
34-
private static final Timer TIMER = Timer.builder("vuln_mirror_processing")
35-
.description("Time taken to process mirrored vulnerabilities")
36-
.register(Metrics.getRegistry());
35+
private static final Logger LOGGER = Logger.getLogger(VulnerabilityMirrorProcessor.class);
3736

3837
@Override
39-
public void process(final Record<String, Bom> record) {
40-
final Timer.Sample timerSample = Timer.start();
41-
42-
try (QueryManager qm = new QueryManager().withL2CacheDisabled()) {
38+
public void process(final ConsumerRecord<String, Bom> record) {
39+
try (QueryManager qm = new QueryManager()) {
4340
LOGGER.debug("Synchronizing Mirrored Vulnerability : " + record.key());
4441
Bom bom = record.value();
4542
String key = record.key();
@@ -112,11 +109,6 @@ public void process(final Record<String, Bom> record) {
112109
synchronizedVulnerability.setVulnerableSoftware(reconciledVsList);
113110
}
114111
qm.persist(synchronizedVulnerability);
115-
} catch (Exception e) {
116-
// TODO: Send record to a dead letter topic.
117-
LOGGER.error("Synchronizing vulnerability %s failed".formatted(record.key()), e);
118-
} finally {
119-
timerSample.stop(TIMER);
120112
}
121113
}
122114

@@ -230,8 +222,8 @@ public VulnerableSoftware mapAffectedRangeToVulnerableSoftware(final QueryManage
230222

231223
for (final Constraint constraint : vers.constraints()) {
232224
if (constraint.version() == null
233-
|| constraint.version().equals("0")
234-
|| constraint.version().equals("*")) {
225+
|| constraint.version().equals("0")
226+
|| constraint.version().equals("*")) {
235227
// Semantically, ">=0" is equivalent to versionStartIncluding=null,
236228
// and ">0" is equivalent to versionStartExcluding=null.
237229
//
@@ -253,12 +245,12 @@ public VulnerableSoftware mapAffectedRangeToVulnerableSoftware(final QueryManage
253245
}
254246

255247
if (versionStartIncluding == null && versionStartExcluding == null
256-
&& versionEndIncluding == null && versionEndExcluding == null) {
248+
&& versionEndIncluding == null && versionEndExcluding == null) {
257249
LOGGER.warn("Unable to assemble a version range from %s for %s".formatted(vers, vulnId));
258250
return null;
259251
}
260252
if ((versionStartIncluding != null || versionStartExcluding != null)
261-
&& (versionEndIncluding == null && versionEndExcluding == null)) {
253+
&& (versionEndIncluding == null && versionEndExcluding == null)) {
262254
LOGGER.warn("Skipping indefinite version range assembled from %s for %s".formatted(vers, vulnId));
263255
return null;
264256
}

src/main/java/org/dependencytrack/event/kafka/streams/KafkaStreamsTopologyFactory.java

-7
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.dependencytrack.event.ProjectPolicyEvaluationEvent;
2424
import org.dependencytrack.event.kafka.KafkaTopics;
2525
import org.dependencytrack.event.kafka.streams.processor.DelayedBomProcessedNotificationProcessor;
26-
import org.dependencytrack.event.kafka.streams.processor.MirrorVulnerabilityProcessor;
2726
import org.dependencytrack.event.kafka.streams.processor.RepositoryMetaResultProcessor;
2827
import org.dependencytrack.event.kafka.streams.processor.VulnerabilityScanResultProcessor;
2928
import org.dependencytrack.model.VulnerabilityScan;
@@ -224,12 +223,6 @@ Topology createTopology() {
224223
.withName("consume_from_%s_topic".formatted(KafkaTopics.REPO_META_ANALYSIS_RESULT.name())))
225224
.process(RepositoryMetaResultProcessor::new, Named.as("process_repo_meta_analysis_result"));
226225

227-
streamsBuilder
228-
.stream(KafkaTopics.NEW_VULNERABILITY.name(),
229-
Consumed.with(KafkaTopics.NEW_VULNERABILITY.keySerde(), KafkaTopics.NEW_VULNERABILITY.valueSerde())
230-
.withName("consume_from_%s_topic".formatted(KafkaTopics.NEW_VULNERABILITY.name())))
231-
.process(MirrorVulnerabilityProcessor::new, Named.as("process_mirror_vulnerability"));
232-
233226
return streamsBuilder.build(streamsProperties);
234227
}
235228

src/main/resources/application.properties

+13
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,19 @@ kafka.streams.transient.processing.exception.threshold.interval=PT30M
491491
# Refer to https://kafka.apache.org/documentation/#consumerconfigs for available options.
492492
# alpine.kafka.processor.<name>.consumer.<consumer.config.name>=
493493

494+
# Required
495+
# Configures the Kafka processor responsible for ingesting mirrored vulnerability
496+
# data from the dtrack.vulnerability topic. The processor only occasionally receives
497+
# records, such that high concurrency is usually not justified.
498+
alpine.kafka.processor.vuln.mirror.max.concurrency=-1
499+
alpine.kafka.processor.vuln.mirror.processing.order=partition
500+
alpine.kafka.processor.vuln.mirror.retry.initial.delay.ms=3000
501+
alpine.kafka.processor.vuln.mirror.retry.multiplier=2
502+
alpine.kafka.processor.vuln.mirror.retry.randomization.factor=0.3
503+
alpine.kafka.processor.vuln.mirror.retry.max.delay.ms=180000
504+
alpine.kafka.processor.vuln.mirror.consumer.group.id=dtrack-apiserver-processor
505+
alpine.kafka.processor.vuln.mirror.consumer.auto.offset.reset=earliest
506+
494507
# Scheduling tasks after 3 minutes (3*60*1000) of starting application
495508
task.scheduler.initial.delay=180000
496509

src/test/java/org/dependencytrack/event/kafka/streams/processor/MirrorVulnerabilityProcessorTest.java src/test/java/org/dependencytrack/event/kafka/processor/VulnerabilityMirrorProcessorTest.java

+41-49
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,26 @@
1-
package org.dependencytrack.event.kafka.streams.processor;
1+
package org.dependencytrack.event.kafka.processor;
22

3-
import org.apache.kafka.common.serialization.Serdes;
4-
import org.apache.kafka.common.serialization.StringSerializer;
5-
import org.apache.kafka.streams.StreamsBuilder;
6-
import org.apache.kafka.streams.TestInputTopic;
7-
import org.apache.kafka.streams.TopologyTestDriver;
8-
import org.apache.kafka.streams.kstream.Consumed;
9-
import org.cyclonedx.proto.v1_4.Bom;
10-
import org.dependencytrack.PersistenceCapableTest;
11-
import org.dependencytrack.event.kafka.serialization.KafkaProtobufSerde;
12-
import org.dependencytrack.event.kafka.serialization.KafkaProtobufSerializer;
133
import org.dependencytrack.model.Severity;
144
import org.dependencytrack.model.Vulnerability;
155
import org.dependencytrack.persistence.CweImporter;
16-
import org.dependencytrack.util.KafkaTestUtil;
17-
import org.junit.After;
186
import org.junit.Before;
197
import org.junit.Test;
208

219
import static org.assertj.core.api.Assertions.assertThat;
10+
import static org.dependencytrack.util.KafkaTestUtil.generateBomFromJson;
2211

23-
public class MirrorVulnerabilityProcessorTest extends PersistenceCapableTest {
24-
25-
private TopologyTestDriver testDriver;
26-
private TestInputTopic<String, Bom> inputTopic;
12+
public class VulnerabilityMirrorProcessorTest extends AbstractProcessorTest {
2713

2814
@Before
29-
public void setUp() throws Exception {
30-
final var streamsBuilder = new StreamsBuilder();
31-
streamsBuilder
32-
.stream("input-topic", Consumed
33-
.with(Serdes.String(), new KafkaProtobufSerde<>(Bom.parser())))
34-
.process(MirrorVulnerabilityProcessor::new);
35-
36-
testDriver = new TopologyTestDriver(streamsBuilder.build());
37-
inputTopic = testDriver.createInputTopic("input-topic",
38-
new StringSerializer(), new KafkaProtobufSerializer<>());
15+
public void before() throws Exception {
16+
super.before();
3917

4018
new CweImporter().processCweDefinitions(); // Required for CWE mapping
4119
}
4220

43-
@After
44-
public void tearDown() {
45-
if (testDriver != null) {
46-
testDriver.close();
47-
}
48-
}
49-
5021
@Test
5122
public void testProcessNvdVuln() throws Exception {
52-
inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson("""
23+
final var bovJson = """
5324
{
5425
"components": [
5526
{
@@ -91,7 +62,10 @@ public void testProcessNvdVuln() throws Exception {
9162
{ "url": "https://github.com/thinkcmf/thinkcmf/issues/736" }
9263
]
9364
}
94-
"""));
65+
""";
66+
67+
final var processor = new VulnerabilityMirrorProcessor();
68+
processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build());
9569

9670
final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489");
9771
assertThat(vuln).isNotNull();
@@ -160,7 +134,7 @@ public void testProcessNvdVuln() throws Exception {
160134

161135
@Test
162136
public void testProcessGitHubVuln() throws Exception {
163-
inputTopic.pipeInput("GITHUB/GHSA-fxwm-579q-49qq", KafkaTestUtil.generateBomFromJson("""
137+
final var bovJson = """
164138
{
165139
"components": [
166140
{
@@ -223,7 +197,10 @@ public void testProcessGitHubVuln() throws Exception {
223197
{ "url": "https://github.com/advisories/GHSA-fxwm-579q-49qq" }
224198
]
225199
}
226-
"""));
200+
""";
201+
202+
final var processor = new VulnerabilityMirrorProcessor();
203+
processor.process(aConsumerRecord("GITHUB/GHSA-fxwm-579q-49qq", generateBomFromJson(bovJson)).build());
227204

228205
final Vulnerability vuln = qm.getVulnerabilityByVulnId("GITHUB", "GHSA-fxwm-579q-49qq");
229206
assertThat(vuln).isNotNull();
@@ -375,7 +352,7 @@ public void testProcessGitHubVuln() throws Exception {
375352

376353
@Test
377354
public void testProcessOsvVuln() throws Exception {
378-
inputTopic.pipeInput("OSV/GHSA-2cc5-23r7-vc4v", KafkaTestUtil.generateBomFromJson("""
355+
final var bovJson = """
379356
{
380357
"components": [
381358
{
@@ -427,7 +404,10 @@ public void testProcessOsvVuln() throws Exception {
427404
{ "url": "https://github.com/ratpack/ratpack/blob/29434f7ac6fd4b36a4495429b70f4c8163100332/ratpack-session/src/main/java/ratpack/session/clientside/ClientSideSessionConfig.java#L29" }
428405
]
429406
}
430-
"""));
407+
""";
408+
409+
final var processor = new VulnerabilityMirrorProcessor();
410+
processor.process(aConsumerRecord("OSV/GHSA-2cc5-23r7-vc4v", generateBomFromJson(bovJson)).build());
431411

432412
final Vulnerability vuln = qm.getVulnerabilityByVulnId("GITHUB", "GHSA-2cc5-23r7-vc4v");
433413
assertThat(vuln).isNotNull();
@@ -555,7 +535,7 @@ public void testProcessOsvVuln() throws Exception {
555535

556536
@Test
557537
public void testProcessVulnWithoutAffects() throws Exception {
558-
inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson("""
538+
final var bovJson = """
559539
{
560540
"components": [
561541
{
@@ -573,7 +553,10 @@ public void testProcessVulnWithoutAffects() throws Exception {
573553
}
574554
]
575555
}
576-
"""));
556+
""";
557+
558+
final var processor = new VulnerabilityMirrorProcessor();
559+
processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build());
577560

578561
final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489");
579562
assertThat(vuln).isNotNull();
@@ -613,7 +596,7 @@ public void testProcessVulnWithoutAffects() throws Exception {
613596

614597
@Test
615598
public void testProcessVulnWithUnmatchedAffectsBomRef() throws Exception {
616-
inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson("""
599+
final var bovJson = """
617600
{
618601
"components": [
619602
{
@@ -639,7 +622,10 @@ public void testProcessVulnWithUnmatchedAffectsBomRef() throws Exception {
639622
}
640623
]
641624
}
642-
"""));
625+
""";
626+
627+
final var processor = new VulnerabilityMirrorProcessor();
628+
processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build());
643629

644630
final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489");
645631
assertThat(vuln).isNotNull();
@@ -679,7 +665,7 @@ public void testProcessVulnWithUnmatchedAffectsBomRef() throws Exception {
679665

680666
@Test
681667
public void testProcessVulnWithVersConstraints() throws Exception {
682-
inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson("""
668+
final var bovJson = """
683669
{
684670
"components": [
685671
{
@@ -731,7 +717,10 @@ public void testProcessVulnWithVersConstraints() throws Exception {
731717
}
732718
]
733719
}
734-
"""));
720+
""";
721+
722+
final var processor = new VulnerabilityMirrorProcessor();
723+
processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build());
735724

736725
final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489");
737726
assertThat(vuln).isNotNull();
@@ -935,7 +924,7 @@ public void testProcessVulnWithVersConstraints() throws Exception {
935924

936925
@Test
937926
public void testProcessVulnWithInvalidCpeOrPurl() throws Exception {
938-
inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson("""
927+
final var bovJson = """
939928
{
940929
"components": [
941930
{
@@ -997,7 +986,10 @@ public void testProcessVulnWithInvalidCpeOrPurl() throws Exception {
997986
}
998987
]
999988
}
1000-
"""));
989+
""";
990+
991+
final var processor = new VulnerabilityMirrorProcessor();
992+
processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build());
1001993

1002994
final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489");
1003995
assertThat(vuln).isNotNull();
@@ -1035,4 +1027,4 @@ public void testProcessVulnWithInvalidCpeOrPurl() throws Exception {
10351027
assertThat(vuln.getVulnerableSoftware()).isEmpty();
10361028
}
10371029

1038-
}
1030+
}

0 commit comments

Comments
 (0)