Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate VulnerabilityScanResultProcessor from Kafka Streams to Parallel Consumer #637

Merged
merged 26 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
528c5e9
Migrate `VulnerabilityScanResultProcessor` from Kafka Streams to Para…
nscuro Mar 26, 2024
6a193e4
Add dispatch `ProjectVulnAnalysisComplete` notifications
nscuro Mar 27, 2024
7ace14a
Enable Kafka processor health check
nscuro Mar 27, 2024
b3a9d40
Ensure JAR is accessible when `COPY`ing it into container
nscuro Mar 27, 2024
a52d557
Fix division by 0 error for `ScanResult`s without `ScannerResult`s
nscuro Mar 28, 2024
2b46d23
Correctly propagate failure reason from `VulnerabilityScan` to `Workf…
nscuro Mar 28, 2024
507d97f
Assert dispatch of in-memory events upon vulnerability scan completion
nscuro Mar 28, 2024
a4d21c9
Remove unused transient `WorkflowState#parentId` field
nscuro Mar 28, 2024
98d3e43
Use `chown` instead of `chmod` when copying JAR in `Dockerfile`
nscuro Mar 28, 2024
6102a1b
Handle delayed `BOM_PROCESSED` notifications
nscuro Mar 28, 2024
72a37b4
Fix `BOM_PROCESSED` notification construction
nscuro Mar 28, 2024
895ca66
Migrate `VulnerabilityScanResultProcessorTest`
nscuro Mar 28, 2024
0128429
Nuke Kafka Streams 💣
nscuro Mar 28, 2024
7412fc2
Fix missing dispatch of `BOM_PROCESSED` notification
nscuro Mar 28, 2024
e0d8d1c
Fix regression in `BOM_PROCESSED` notification content
nscuro Mar 28, 2024
5918d76
Remove unused code
nscuro Mar 28, 2024
a8065df
Fix duplicate metrics update events being dispatched for vuln scans o…
nscuro Apr 2, 2024
bf6522f
Add property to control shutdown timeout for processors
nscuro Apr 3, 2024
5e1c982
Do not block when dispatching events in `VulnerabilityScanResultProce…
nscuro Apr 11, 2024
856c0ec
Harmonize processor names
nscuro Apr 11, 2024
b4c5be6
Set default producer `linger.ms` to `100`
nscuro Apr 11, 2024
3564474
Merge branch 'main' of github.com:DependencyTrack/hyades-apiserver in…
nscuro Apr 17, 2024
861585e
Add annotations for new config properties
nscuro Apr 17, 2024
093284b
Revert unrelated `Dockerfile` change
nscuro Apr 17, 2024
7179af5
Remove `kafka.num.stream.threads` property
nscuro Apr 17, 2024
b2b61b6
Merge branch 'main' of github.com:DependencyTrack/hyades-apiserver in…
nscuro Apr 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -288,16 +288,7 @@
<artifactId>kafka-clients</artifactId>
<version>${lib.kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${lib.kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>${lib.kafka.version}</version>
</dependency>

<!-- shedlock dependencies -->
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
Expand Down
9 changes: 0 additions & 9 deletions src/main/java/org/dependencytrack/common/ConfigKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,8 @@ public enum ConfigKey implements Config.Key {
KEY_STORE_PATH("kafka.keystore.path", ""),

KEY_STORE_PASSWORD("kafka.keystore.password", ""),
KAFKA_NUM_STREAM_THREADS("kafka.num.stream.threads", 1),
KAFKA_TOPIC_PREFIX("kafka.topic.prefix", ""),
KAFKA_PRODUCER_DRAIN_TIMEOUT_DURATION("kafka.producer.drain.timeout.duration", "PT30S"),
KAFKA_STREAMS_DESERIALIZATION_EXCEPTION_THRESHOLD_COUNT("kafka.streams.deserialization.exception.threshold.count", "5"),
KAFKA_STREAMS_DESERIALIZATION_EXCEPTION_THRESHOLD_INTERVAL("kafka.streams.deserialization.exception.threshold.interval", "PT30M"),
KAFKA_STREAMS_METRICS_RECORDING_LEVEL("kafka.streams.metrics.recording.level", "INFO"),
KAFKA_STREAMS_PRODUCTION_EXCEPTION_THRESHOLD_COUNT("kafka.streams.production.exception.threshold.count", "5"),
KAFKA_STREAMS_PRODUCTION_EXCEPTION_THRESHOLD_INTERVAL("kafka.streams.production.exception.threshold.interval", "PT30H"),
KAFKA_STREAMS_TRANSIENT_PROCESSING_EXCEPTION_THRESHOLD_COUNT("kafka.streams.transient.processing.exception.threshold.count", "50"),
KAFKA_STREAMS_TRANSIENT_PROCESSING_EXCEPTION_THRESHOLD_INTERVAL("kafka.streams.transient.processing.exception.threshold.interval", "PT30M"),
KAFKA_STREAMS_DRAIN_TIMEOUT_DURATION("kafka.streams.drain.timeout.duration", "PT30S"),

CRON_EXPRESSION_FOR_PORTFOLIO_METRICS_TASK("task.cron.metrics.portfolio", "10 * * * *"),
CRON_EXPRESSION_FOR_VULNERABILITY_METRICS_TASK("task.cron.metrics.vulnerability", "40 * * * *"),
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/dependencytrack/common/MdcKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
*/
public final class MdcKeys {

public static final String MDC_COMPONENT_UUID = "componentUuid";
public static final String MDC_KAFKA_RECORD_TOPIC = "kafkaRecordTopic";
public static final String MDC_KAFKA_RECORD_PARTITION = "kafkaRecordPartition";
public static final String MDC_KAFKA_RECORD_OFFSET = "kafkaRecordOffset";
public static final String MDC_KAFKA_RECORD_KEY = "kafkaRecordKey";
public static final String MDC_SCAN_TOKEN = "scanToken";

private MdcKeys() {
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
* Utility class to convert {@link alpine.event.framework.Event}s and {@link alpine.notification.Notification}s
* to {@link KafkaEvent}s.
*/
final class KafkaEventConverter {
public final class KafkaEventConverter {

private KafkaEventConverter() {
}
Expand All @@ -80,7 +80,7 @@ private KafkaEventConverter() {
return convert(protoNotification);
}

static KafkaEvent<?, ?> convert(final Notification notification) {
public static KafkaEvent<?, ?> convert(final Notification notification) {
final Topic<String, Notification> topic = extractDestinationTopic(notification);

final String recordKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ private static Producer<byte[], byte[]> createProducer() {
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.SNAPPY.name);
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.put(ProducerConfig.LINGER_MS_CONFIG, "100");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
if (Config.getInstance().getPropertyAsBoolean(ConfigKey.KAFKA_TLS_ENABLED)) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Config.getInstance().getProperty(ConfigKey.KAFKA_TLS_PROTOCOL));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public final class KafkaTopics {
public static final Topic<String, AnalysisResult> REPO_META_ANALYSIS_RESULT;
public static final Topic<ScanKey, ScanCommand> VULN_ANALYSIS_COMMAND;
public static final Topic<ScanKey, ScanResult> VULN_ANALYSIS_RESULT;
public static final Topic<String, ScanResult> VULN_ANALYSIS_RESULT_PROCESSED;

public static final Topic<String, Notification> NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE;
public static final Topic<String, EpssItem> NEW_EPSS;
Expand All @@ -78,6 +79,7 @@ public final class KafkaTopics {
REPO_META_ANALYSIS_RESULT = new Topic<>("dtrack.repo-meta-analysis.result", Serdes.String(), new KafkaProtobufSerde<>(AnalysisResult.parser()));
VULN_ANALYSIS_COMMAND = new Topic<>("dtrack.vuln-analysis.component", new KafkaProtobufSerde<>(ScanKey.parser()), new KafkaProtobufSerde<>(ScanCommand.parser()));
VULN_ANALYSIS_RESULT = new Topic<>("dtrack.vuln-analysis.result", new KafkaProtobufSerde<>(ScanKey.parser()), new KafkaProtobufSerde<>(ScanResult.parser()));
VULN_ANALYSIS_RESULT_PROCESSED = new Topic<>("dtrack.vuln-analysis.result.processed", Serdes.String(), new KafkaProtobufSerde<>(ScanResult.parser()));
NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE = new Topic<>("dtrack.notification.project-vuln-analysis-complete", Serdes.String(), NOTIFICATION_SERDE);
NEW_EPSS = new Topic<>("dtrack.epss", Serdes.String(), new KafkaProtobufSerde<>(EpssItem.parser()));
}
Expand Down
Loading