Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate VulnerabilityScanResultProcessor from Kafka Streams to Parallel Consumer #637

Merged
merged 26 commits into from
Apr 22, 2024

Conversation

nscuro
Copy link
Member

@nscuro nscuro commented Mar 26, 2024

Description

This PR migrates the VulnerabilityScanResultProcessor from Kafka Streams to the new, Parallel Consumer based, processor API. As this was the last piece of functionality relying on Kafka Streams, this PR removes Kafka Streams entirely.

Addressed Issue

Relates to DependencyTrack/hyades#346
Relates to DependencyTrack/hyades#901
Relates to DependencyTrack/hyades#907

Additional Details

A new processor is introduced: ProcessedVulnerabilityScanResultProcessor.
Its job is to handle ScanResults that were successfully processed by the API server, and:

  • Count them towards the completion of VulnerabilityScans
  • When a VulnerabilityScan is detected to be complete:
    • Dispatch a PROJECT_VULN_ANALYSIS_COMPLETE notification
    • Trigger policy evaluation and metrics update events
    • If enabled, dispatch a delayed BOM_PROCESSED notification

This was previously a large chunk of the Kafka Streams topology:

.map((scanKey, scanResult) -> {
// Drop vulnerabilities from scanner results, as they can be rather large, and we don't need them anymore.
// Dropping them will save us some compression and network overhead during the repartition.
// We can remove this step should we ever need access to the vulnerabilities again.
final var strippedScanResult = scanResult.toBuilder()
.clearScannerResults()
.addAllScannerResults(scanResult.getScannerResultsList().stream()
.map(scannerResult -> scannerResult.toBuilder()
.clearBom()
.build())
.toList())
.build();
return KeyValue.pair(scanKey.getScanToken(), strippedScanResult);
}, Named.as("re-key_scan-result_to_scan-token"))
.repartition(Repartitioned
.with(Serdes.String(), KafkaTopics.VULN_ANALYSIS_RESULT.valueSerde())
.withName("processed-vuln-scan-result-by-scan-token"))
.mapValues((scanToken, scanResult) -> {
try (final var qm = new QueryManager()) {
return qm.recordVulnerabilityScanResult(scanToken, scanResult);
}
}, Named.as("record_processed_vuln_scan_result"))
.filter((scanToken, vulnScan) -> vulnScan != null,
Named.as("filter_completed_vuln_scans"))
.mapValues((scanToken, vulnScan) -> {
final double failureRate = (double) vulnScan.getScanFailed() / vulnScan.getScanTotal();
if (failureRate > vulnScan.getFailureThreshold()) {
try (var qm = new QueryManager()) {
// Detach VulnerabilityScan objects when committing changes. Without this,
// all fields except the ID field will be unloaded on commit (the object will become HOLLOW).
qm.getPersistenceManager().setProperty(PropertyNames.PROPERTY_DETACH_ALL_ON_COMMIT, "true");
vulnScan = qm.updateVulnerabilityScanStatus(vulnScan.getToken(), VulnerabilityScan.Status.FAILED);
vulnScan.setFailureReason("Failure threshold of " + vulnScan.getFailureThreshold() + "% exceeded: " + failureRate + "% of scans failed");
LOGGER.warn("Detected failure of vulnerability scan (token=%s, targetType=%s, targetIdentifier=%s): %s"
.formatted(vulnScan.getToken(), vulnScan.getTargetType(), vulnScan.getTargetIdentifier(), vulnScan.getFailureReason()));
}
}
return vulnScan;
}, Named.as("evaluate_vuln_scan_failure_rate"));
completedVulnScanStream
.foreach((scanToken, vulnScan) -> {
try (var qm = new QueryManager()) {
final WorkflowState vulnAnalysisState = qm.getWorkflowStateByTokenAndStep(UUID.fromString(scanToken), WorkflowStep.VULN_ANALYSIS);
if (vulnAnalysisState == null) {
// No workflow exists for this scan; Nothing to update.
return;
}
if (vulnScan.getStatus() == VulnerabilityScan.Status.FAILED) {
vulnAnalysisState.setStatus(WorkflowStatus.FAILED);
vulnAnalysisState.setUpdatedAt(new Date());
vulnAnalysisState.setFailureReason(vulnScan.getFailureReason());
final WorkflowState updatedVulnAnalysisState = qm.updateWorkflowState(vulnAnalysisState);
qm.updateAllDescendantStatesOfParent(updatedVulnAnalysisState, WorkflowStatus.CANCELLED, Date.from(Instant.now()));
return;
}
vulnAnalysisState.setStatus(WorkflowStatus.COMPLETED);
vulnAnalysisState.setUpdatedAt(Date.from(Instant.now()));
qm.updateWorkflowState(vulnAnalysisState);
}
}, Named.as("update_vuln_analysis_workflow_status"));
final KStream<String, VulnerabilityScan> completedVulnScanWithProjectTargetStream = completedVulnScanStream
.filter((scanToken, vulnScan) -> vulnScan.getTargetType() == VulnerabilityScan.TargetType.PROJECT,
Named.as("filter_vuln_scans_with_project_target"));
// For each completed vulnerability scan that targeted a project (opposed to individual components),
// determine its overall status, gather all findings, and emit a PROJECT_VULN_ANALYSIS_COMPLETE notification.
completedVulnScanWithProjectTargetStream
.map((scanToken, vulnScan) -> {
final alpine.notification.Notification alpineNotification;
try {
alpineNotification = vulnScan.getStatus() == VulnerabilityScan.Status.FAILED
? createProjectVulnerabilityAnalysisCompleteNotification(vulnScan,
UUID.fromString(scanToken),
ProjectVulnAnalysisStatus.PROJECT_VULN_ANALYSIS_STATUS_FAILED)
: createProjectVulnerabilityAnalysisCompleteNotification(
vulnScan,
UUID.fromString(scanToken),
ProjectVulnAnalysisStatus.PROJECT_VULN_ANALYSIS_STATUS_COMPLETED);
} catch (RuntimeException e) {
LOGGER.warn("Failed to generate a %s notification (project: %s; token: %s)"
.formatted(NotificationGroup.PROJECT_VULN_ANALYSIS_COMPLETE,
vulnScan.getTargetIdentifier(), vulnScan.getToken()), e);
return KeyValue.pair(vulnScan.getTargetIdentifier().toString(), null);
}
return KeyValue.pair(vulnScan.getTargetIdentifier().toString(), convert(alpineNotification));
}, Named.as("map_vuln_scan_to_vuln_analysis_complete_notification"))
.filter((projectUuid, notification) -> notification != null,
Named.as("filter_valid_project-vuln-analysis-complete_notification"))
.to(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name(), Produced
.with(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.keySerde(),
KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.valueSerde())
.withName("produce_to_%s_topic".formatted(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name())));
// When delaying of BOM_PROCESSED notifications is enabled, emit a BOM_PROCESSED notification
// for each completed vulnerability scan that targeted a project. But only do so when the scan is
// part of a workflow that includes a BOM_PROCESSING step with status COMPLETED.
if (delayBomProcessedNotification) {
completedVulnScanStream
.process(DelayedBomProcessedNotificationProcessor::new,
Named.as("tmp_delay_bom_processed_notification_process_completed_vuln_scan"))
.to(KafkaTopics.NOTIFICATION_BOM.name(), Produced
.with(KafkaTopics.NOTIFICATION_BOM.keySerde(), KafkaTopics.NOTIFICATION_BOM.valueSerde())
.withName("tmp_delay_bom_processed_notification_produce_to_%s_topic".formatted(KafkaTopics.NOTIFICATION_BOM.name())));
}
// For each successfully completed vulnerability scan, trigger a policy evaluation and metrics update
// for the targeted entity (project or individual component).
completedVulnScanStream
.filter((scanToken, vulnScan) -> vulnScan.getStatus() != VulnerabilityScan.Status.FAILED,
Named.as("filter_failed_vuln_scans"))
.foreach((scanToken, vulnScan) -> {
final ChainableEvent policyEvaluationEvent = switch (vulnScan.getTargetType()) {
case COMPONENT -> new ComponentPolicyEvaluationEvent(vulnScan.getTargetIdentifier());
case PROJECT -> new ProjectPolicyEvaluationEvent(vulnScan.getTargetIdentifier());
};
policyEvaluationEvent.setChainIdentifier(UUID.fromString(vulnScan.getToken()));
// Trigger a metrics update no matter if the policy evaluation succeeded or not.
final ChainableEvent metricsUpdateEvent = switch (vulnScan.getTargetType()) {
case COMPONENT -> new ComponentMetricsUpdateEvent(vulnScan.getTargetIdentifier());
case PROJECT -> new ProjectMetricsUpdateEvent(vulnScan.getTargetIdentifier());
};
metricsUpdateEvent.setChainIdentifier(UUID.fromString(vulnScan.getToken()));
policyEvaluationEvent.onFailure(metricsUpdateEvent);
policyEvaluationEvent.onSuccess(metricsUpdateEvent);
Event.dispatch(policyEvaluationEvent);
}, Named.as("trigger_policy_evaluation"));

The downside of doing it in Kafka Streams was that the entire procedure is executed for every single record.

Now, records are consumed in batches (up to 1000 records per default), which drastically increases throughput.

Warning

For existing deployments, alpine.kafka.processor.vuln.scan.result.consumer.auto.offset.reset should be set to latest in order to avoid re-consuming existing vulnerability analysis results.

With the removal of Kafka Streams (which includes RocksDB), the API server JAR is a good 50MB smaller!

image

Breaking Changes

Checklist

  • I have read and understand the contributing guidelines
  • This PR fixes a defect, and I have provided tests to verify that the fix is effective
  • This PR implements an enhancement, and I have provided tests to verify that it works as intended
  • This PR introduces changes to the database model, and I have added corresponding update logic
  • This PR introduces new or alters existing behavior, and I have updated the documentation accordingly

@nscuro nscuro added the enhancement New feature or request label Mar 26, 2024
Copy link

codacy-production bot commented Mar 29, 2024

Coverage summary from Codacy

See diff coverage on Codacy

Coverage variation Diff coverage
+1.58% (target: -1.00%) 92.71% (target: 70.00%)
Coverage variation details
Coverable lines Covered lines Coverage
Common ancestor commit (fee43d8) 18640 14621 78.44%
Head commit (b2b61b6) 18010 (-630) 14412 (-209) 80.02% (+1.58%)

Coverage variation is the difference between the coverage for the head and common ancestor commits of the pull request branch: <coverage of head commit> - <coverage of common ancestor commit>

Diff coverage details
Coverable lines Covered lines Diff coverage
Pull request (#637) 247 229 92.71%

Diff coverage is the percentage of lines that are covered by tests out of the coverable lines that the pull request added or modified: <covered lines added or modified>/<coverable lines added or modified> * 100%

See your quality gate settings    Change summary preferences

Codacy will stop sending the deprecated coverage status from June 5th, 2024. Learn more

nscuro added a commit to DependencyTrack/hyades that referenced this pull request Mar 29, 2024
As introduced in DependencyTrack/hyades-apiserver#637

Also removes Kafka Streams related environment variables for API server from `docker-compose.yml`

Signed-off-by: nscuro <[email protected]>
nscuro added a commit to DependencyTrack/hyades that referenced this pull request Apr 11, 2024
As introduced in DependencyTrack/hyades-apiserver#637

Also removes Kafka Streams related environment variables for API server from `docker-compose.yml`

Signed-off-by: nscuro <[email protected]>
@nscuro nscuro marked this pull request as ready for review April 11, 2024 15:20
nscuro added a commit to DependencyTrack/hyades that referenced this pull request Apr 12, 2024
As introduced in DependencyTrack/hyades-apiserver#637

Also removes Kafka Streams related environment variables for API server from `docker-compose.yml`

Signed-off-by: nscuro <[email protected]>
nscuro added 20 commits April 12, 2024 15:06
The status of the workflow step will always be `COMPLETED`, no matter if delayed dispatch is enabled or not. This is existing behavior and the regression was catched via `BomProcessedNotificationDelayedE2ET`.

Signed-off-by: nscuro <[email protected]>
Signed-off-by: nscuro <[email protected]>
As a replacement for `KAFKA_STREAMS_DRAIN_TIMEOUT_DURATION`.

Signed-off-by: nscuro <[email protected]>
This is to improve throughput, and to match the default used by Kafka Streams, which we are replacing: https://kafka.apache.org/37/documentation/streams/developer-guide/config-streams.html#default-values

Signed-off-by: nscuro <[email protected]>
nscuro added a commit to DependencyTrack/hyades that referenced this pull request Apr 13, 2024
As introduced in DependencyTrack/hyades-apiserver#637

Also removes Kafka Streams related environment variables for API server from `docker-compose.yml`

Signed-off-by: nscuro <[email protected]>
@nscuro nscuro added this to the 5.5.0 milestone Apr 22, 2024
@nscuro nscuro merged commit 40dead3 into main Apr 22, 2024
10 checks passed
@nscuro nscuro deleted the issue-907-4 branch April 22, 2024 09:59
@github-actions github-actions bot locked as resolved and limited conversation to collaborators May 22, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants