Skip to content

Commit

Permalink
Merge pull request #637 from DependencyTrack/issue-907-4
Browse files Browse the repository at this point in the history
Migrate `VulnerabilityScanResultProcessor` from Kafka Streams to Parallel Consumer
  • Loading branch information
nscuro authored Apr 22, 2024
2 parents fee43d8 + b2b61b6 commit 40dead3
Show file tree
Hide file tree
Showing 52 changed files with 1,515 additions and 3,997 deletions.
11 changes: 1 addition & 10 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -288,16 +288,7 @@
<artifactId>kafka-clients</artifactId>
<version>${lib.kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${lib.kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>${lib.kafka.version}</version>
</dependency>

<!-- shedlock dependencies -->
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
Expand Down
9 changes: 0 additions & 9 deletions src/main/java/org/dependencytrack/common/ConfigKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,8 @@ public enum ConfigKey implements Config.Key {
KEY_STORE_PATH("kafka.keystore.path", ""),

KEY_STORE_PASSWORD("kafka.keystore.password", ""),
KAFKA_NUM_STREAM_THREADS("kafka.num.stream.threads", 1),
KAFKA_TOPIC_PREFIX("kafka.topic.prefix", ""),
KAFKA_PRODUCER_DRAIN_TIMEOUT_DURATION("kafka.producer.drain.timeout.duration", "PT30S"),
KAFKA_STREAMS_DESERIALIZATION_EXCEPTION_THRESHOLD_COUNT("kafka.streams.deserialization.exception.threshold.count", "5"),
KAFKA_STREAMS_DESERIALIZATION_EXCEPTION_THRESHOLD_INTERVAL("kafka.streams.deserialization.exception.threshold.interval", "PT30M"),
KAFKA_STREAMS_METRICS_RECORDING_LEVEL("kafka.streams.metrics.recording.level", "INFO"),
KAFKA_STREAMS_PRODUCTION_EXCEPTION_THRESHOLD_COUNT("kafka.streams.production.exception.threshold.count", "5"),
KAFKA_STREAMS_PRODUCTION_EXCEPTION_THRESHOLD_INTERVAL("kafka.streams.production.exception.threshold.interval", "PT30H"),
KAFKA_STREAMS_TRANSIENT_PROCESSING_EXCEPTION_THRESHOLD_COUNT("kafka.streams.transient.processing.exception.threshold.count", "50"),
KAFKA_STREAMS_TRANSIENT_PROCESSING_EXCEPTION_THRESHOLD_INTERVAL("kafka.streams.transient.processing.exception.threshold.interval", "PT30M"),
KAFKA_STREAMS_DRAIN_TIMEOUT_DURATION("kafka.streams.drain.timeout.duration", "PT30S"),

CRON_EXPRESSION_FOR_PORTFOLIO_METRICS_TASK("task.cron.metrics.portfolio", "10 * * * *"),
CRON_EXPRESSION_FOR_VULNERABILITY_METRICS_TASK("task.cron.metrics.vulnerability", "40 * * * *"),
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/dependencytrack/common/MdcKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
*/
public final class MdcKeys {

public static final String MDC_COMPONENT_UUID = "componentUuid";
public static final String MDC_KAFKA_RECORD_TOPIC = "kafkaRecordTopic";
public static final String MDC_KAFKA_RECORD_PARTITION = "kafkaRecordPartition";
public static final String MDC_KAFKA_RECORD_OFFSET = "kafkaRecordOffset";
public static final String MDC_KAFKA_RECORD_KEY = "kafkaRecordKey";
public static final String MDC_SCAN_TOKEN = "scanToken";

private MdcKeys() {
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
* Utility class to convert {@link alpine.event.framework.Event}s and {@link alpine.notification.Notification}s
* to {@link KafkaEvent}s.
*/
final class KafkaEventConverter {
public final class KafkaEventConverter {

private KafkaEventConverter() {
}
Expand All @@ -80,7 +80,7 @@ private KafkaEventConverter() {
return convert(protoNotification);
}

static KafkaEvent<?, ?> convert(final Notification notification) {
public static KafkaEvent<?, ?> convert(final Notification notification) {
final Topic<String, Notification> topic = extractDestinationTopic(notification);

final String recordKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ private static Producer<byte[], byte[]> createProducer() {
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.SNAPPY.name);
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.put(ProducerConfig.LINGER_MS_CONFIG, "100");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
if (Config.getInstance().getPropertyAsBoolean(ConfigKey.KAFKA_TLS_ENABLED)) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Config.getInstance().getProperty(ConfigKey.KAFKA_TLS_PROTOCOL));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public final class KafkaTopics {
public static final Topic<String, AnalysisResult> REPO_META_ANALYSIS_RESULT;
public static final Topic<ScanKey, ScanCommand> VULN_ANALYSIS_COMMAND;
public static final Topic<ScanKey, ScanResult> VULN_ANALYSIS_RESULT;
public static final Topic<String, ScanResult> VULN_ANALYSIS_RESULT_PROCESSED;

public static final Topic<String, Notification> NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE;
public static final Topic<String, EpssItem> NEW_EPSS;
Expand All @@ -78,6 +79,7 @@ public final class KafkaTopics {
REPO_META_ANALYSIS_RESULT = new Topic<>("dtrack.repo-meta-analysis.result", Serdes.String(), new KafkaProtobufSerde<>(AnalysisResult.parser()));
VULN_ANALYSIS_COMMAND = new Topic<>("dtrack.vuln-analysis.component", new KafkaProtobufSerde<>(ScanKey.parser()), new KafkaProtobufSerde<>(ScanCommand.parser()));
VULN_ANALYSIS_RESULT = new Topic<>("dtrack.vuln-analysis.result", new KafkaProtobufSerde<>(ScanKey.parser()), new KafkaProtobufSerde<>(ScanResult.parser()));
VULN_ANALYSIS_RESULT_PROCESSED = new Topic<>("dtrack.vuln-analysis.result.processed", Serdes.String(), new KafkaProtobufSerde<>(ScanResult.parser()));
NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE = new Topic<>("dtrack.notification.project-vuln-analysis-complete", Serdes.String(), NOTIFICATION_SERDE);
NEW_EPSS = new Topic<>("dtrack.epss", Serdes.String(), new KafkaProtobufSerde<>(EpssItem.parser()));
}
Expand Down
Loading

0 comments on commit 40dead3

Please sign in to comment.