Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate RepositoryMetaResultProcessor from Kafka Streams to Parallel Consumer #554

Merged
merged 2 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public void contextInitialized(final ServletContextEvent event) {

PROCESSOR_MANAGER.registerProcessor(VulnerabilityMirrorProcessor.PROCESSOR_NAME,
KafkaTopics.NEW_VULNERABILITY, new VulnerabilityMirrorProcessor());
PROCESSOR_MANAGER.registerProcessor(RepositoryMetaResultProcessor.PROCESSOR_NAME,
KafkaTopics.REPO_META_ANALYSIS_RESULT, new RepositoryMetaResultProcessor());

PROCESSOR_MANAGER.startAll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,25 @@
* SPDX-License-Identifier: Apache-2.0
* Copyright (c) OWASP Foundation. All Rights Reserved.
*/
package org.dependencytrack.event.kafka.streams.processor;
package org.dependencytrack.event.kafka.processor;

import alpine.common.logging.Logger;
import alpine.common.metrics.Metrics;
import com.github.packageurl.MalformedPackageURLException;
import com.github.packageurl.PackageURL;
import io.micrometer.core.instrument.Timer;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.dependencytrack.event.kafka.processor.api.Processor;
import org.dependencytrack.event.kafka.processor.exception.ProcessingException;
import org.dependencytrack.model.FetchStatus;
import org.dependencytrack.model.IntegrityMetaComponent;
import org.dependencytrack.model.RepositoryMetaComponent;
import org.dependencytrack.model.RepositoryType;
import org.dependencytrack.persistence.QueryManager;
import org.dependencytrack.proto.repometaanalysis.v1.AnalysisResult;
import org.postgresql.util.PSQLState;
import org.dependencytrack.util.PersistenceUtil;

import javax.jdo.JDODataStoreException;
import javax.jdo.PersistenceManager;
import javax.jdo.Query;
import javax.jdo.Transaction;
import java.sql.SQLException;
import java.util.Date;
import java.util.Optional;

Expand All @@ -47,16 +43,14 @@
/**
* A {@link Processor} responsible for processing result of component repository meta analyses.
*/
public class RepositoryMetaResultProcessor implements Processor<String, AnalysisResult, Void, Void> {
public class RepositoryMetaResultProcessor implements Processor<String, AnalysisResult> {

static final String PROCESSOR_NAME = "repo.meta.analysis.result";

private static final Logger LOGGER = Logger.getLogger(RepositoryMetaResultProcessor.class);
private static final Timer TIMER = Timer.builder("repo_meta_result_processing")
.description("Time taken to process repository meta analysis results")
.register(Metrics.getRegistry());

@Override
public void process(final Record<String, AnalysisResult> record) {
final Timer.Sample timerSample = Timer.start();
public void process(final ConsumerRecord<String, AnalysisResult> record) throws ProcessingException {
if (!isRecordValid(record)) {
return;
}
Expand All @@ -67,13 +61,11 @@ public void process(final Record<String, AnalysisResult> record) {
performIntegrityCheck(integrityMetaComponent, record.value(), qm);
}
} catch (Exception e) {
LOGGER.error("An unexpected error occurred while processing record %s".formatted(record), e);
} finally {
timerSample.stop(TIMER);
throw new ProcessingException(e);
}
}

private IntegrityMetaComponent synchronizeIntegrityMetadata(final QueryManager queryManager, final Record<String, AnalysisResult> record) throws MalformedPackageURLException {
private IntegrityMetaComponent synchronizeIntegrityMetadata(final QueryManager queryManager, final ConsumerRecord<String, AnalysisResult> record) throws MalformedPackageURLException {
final AnalysisResult result = record.value();
PackageURL purl = new PackageURL(result.getComponent().getPurl());
if (result.hasIntegrityMeta()) {
Expand All @@ -84,86 +76,73 @@ private IntegrityMetaComponent synchronizeIntegrityMetadata(final QueryManager q
}
}

private void synchronizeRepositoryMetadata(final QueryManager queryManager, final Record<String, AnalysisResult> record) throws Exception {
PersistenceManager pm = queryManager.getPersistenceManager();
private void synchronizeRepositoryMetadata(final QueryManager qm, final ConsumerRecord<String, AnalysisResult> record) throws Exception {
final PersistenceManager pm = qm.getPersistenceManager();
final AnalysisResult result = record.value();
PackageURL purl = new PackageURL(result.getComponent().getPurl());
final var purl = new PackageURL(result.getComponent().getPurl());

// It is possible that the same meta info is reported for multiple components in parallel,
// causing unique constraint violations when attempting to insert into the REPOSITORY_META_COMPONENT table.
// In such cases, we can get away with simply retrying to SELECT+UPDATE or INSERT again. We'll attempt
// up to 3 times before giving up.
for (int i = 0; i < 3; i++) {
final Transaction trx = pm.currentTransaction();
try {
RepositoryMetaComponent repositoryMetaComponentResult = createRepositoryMetaResult(record, pm, purl);
if (repositoryMetaComponentResult != null) {
trx.begin();
pm.makePersistent(repositoryMetaComponentResult);
trx.commit();
break; // this means that transaction was successful and we do not need to retry
}
} catch (JDODataStoreException e) {
// TODO: DataNucleus doesn't map constraint violation exceptions very well,
// so we have to depend on the exception of the underlying JDBC driver to
// tell us what happened. We currently only handle PostgreSQL, but we'll have
// to do the same for at least H2 and MSSQL.
if (ExceptionUtils.getRootCause(e) instanceof final SQLException se
&& PSQLState.UNIQUE_VIOLATION.getState().equals(se.getSQLState())) {
continue; // Retry
}

throw e;
} finally {
if (trx.isActive()) {
trx.rollback();
}
qm.runInRetryableTransaction(() -> {
final RepositoryMetaComponent repositoryMetaComponentResult = createRepositoryMetaResult(record, pm, purl);
if (repositoryMetaComponentResult != null) {
pm.makePersistent(repositoryMetaComponentResult);
}
}

return null;
}, PersistenceUtil::isUniqueConstraintViolation);
}

private RepositoryMetaComponent createRepositoryMetaResult(Record<String, AnalysisResult> incomingAnalysisResultRecord, PersistenceManager pm, PackageURL purl) throws Exception {
private RepositoryMetaComponent createRepositoryMetaResult(ConsumerRecord<String, AnalysisResult> incomingAnalysisResultRecord, PersistenceManager pm, PackageURL purl) {
final AnalysisResult result = incomingAnalysisResultRecord.value();
if (result.hasLatestVersion()) {
try (final Query<RepositoryMetaComponent> query = pm.newQuery(RepositoryMetaComponent.class)) {
query.setFilter("repositoryType == :repositoryType && namespace == :namespace && name == :name");
query.setParameters(
RepositoryType.resolve(purl),
purl.getNamespace(),
purl.getName()
);
RepositoryMetaComponent persistentRepoMetaComponent = query.executeUnique();
if (persistentRepoMetaComponent == null) {
persistentRepoMetaComponent = new RepositoryMetaComponent();
}

if (persistentRepoMetaComponent.getLastCheck() != null
&& persistentRepoMetaComponent.getLastCheck().after(new Date(incomingAnalysisResultRecord.timestamp()))) {
LOGGER.warn("""
Received repository meta information for %s that is older\s
than what's already in the database; Discarding
""".formatted(purl));
return null;
}

persistentRepoMetaComponent.setRepositoryType(RepositoryType.resolve(purl));
persistentRepoMetaComponent.setNamespace(purl.getNamespace());
persistentRepoMetaComponent.setName(purl.getName());
if (result.hasLatestVersion()) {
persistentRepoMetaComponent.setLatestVersion(result.getLatestVersion());
}
if (result.hasPublished()) {
persistentRepoMetaComponent.setPublished(new Date(result.getPublished().getSeconds() * 1000));
}
persistentRepoMetaComponent.setLastCheck(new Date(incomingAnalysisResultRecord.timestamp()));
return persistentRepoMetaComponent;
}
} else {
if (!result.hasLatestVersion()) {
return null;
}

final Query<RepositoryMetaComponent> query = pm.newQuery(RepositoryMetaComponent.class);
query.setFilter("repositoryType == :repositoryType && namespace == :namespace && name == :name");
query.setParameters(
RepositoryType.resolve(purl),
purl.getNamespace(),
purl.getName()
);

RepositoryMetaComponent persistentRepoMetaComponent;
try {
persistentRepoMetaComponent = query.executeUnique();
} finally {
query.closeAll();
}

if (persistentRepoMetaComponent == null) {
persistentRepoMetaComponent = new RepositoryMetaComponent();
}

if (persistentRepoMetaComponent.getLastCheck() != null
&& persistentRepoMetaComponent.getLastCheck().after(new Date(incomingAnalysisResultRecord.timestamp()))) {
LOGGER.warn("""
Received repository meta information for %s that is older\s
than what's already in the database; Discarding
""".formatted(purl));
return null;
}

persistentRepoMetaComponent.setRepositoryType(RepositoryType.resolve(purl));
persistentRepoMetaComponent.setNamespace(purl.getNamespace());
persistentRepoMetaComponent.setName(purl.getName());
if (result.hasLatestVersion()) {
persistentRepoMetaComponent.setLatestVersion(result.getLatestVersion());
}
if (result.hasPublished()) {
persistentRepoMetaComponent.setPublished(new Date(result.getPublished().getSeconds() * 1000));
}
persistentRepoMetaComponent.setLastCheck(new Date(incomingAnalysisResultRecord.timestamp()));
return persistentRepoMetaComponent;
}

private IntegrityMetaComponent synchronizeIntegrityMetaResult(final Record<String, AnalysisResult> incomingAnalysisResultRecord, QueryManager queryManager, PackageURL purl) {
private IntegrityMetaComponent synchronizeIntegrityMetaResult(final ConsumerRecord<String, AnalysisResult> incomingAnalysisResultRecord, QueryManager queryManager, PackageURL purl) {
final AnalysisResult result = incomingAnalysisResultRecord.value();
IntegrityMetaComponent persistentIntegrityMetaComponent = queryManager.getIntegrityMetaComponent(purl.toString());
if (persistentIntegrityMetaComponent != null && persistentIntegrityMetaComponent.getStatus() != null && persistentIntegrityMetaComponent.getStatus().equals(FetchStatus.PROCESSED)) {
Expand All @@ -178,10 +157,10 @@ private IntegrityMetaComponent synchronizeIntegrityMetaResult(final Record<Strin

if (result.getIntegrityMeta().hasMd5() || result.getIntegrityMeta().hasSha1() || result.getIntegrityMeta().hasSha256()
|| result.getIntegrityMeta().hasSha512() || result.getIntegrityMeta().hasCurrentVersionLastModified()) {
Optional.ofNullable(result.getIntegrityMeta().getMd5()).ifPresent(persistentIntegrityMetaComponent::setMd5);
Optional.ofNullable(result.getIntegrityMeta().getSha1()).ifPresent(persistentIntegrityMetaComponent::setSha1);
Optional.ofNullable(result.getIntegrityMeta().getSha256()).ifPresent(persistentIntegrityMetaComponent::setSha256);
Optional.ofNullable(result.getIntegrityMeta().getSha512()).ifPresent(persistentIntegrityMetaComponent::setSha512);
Optional.of(result.getIntegrityMeta().getMd5()).filter(StringUtils::isNotBlank).ifPresent(persistentIntegrityMetaComponent::setMd5);
Optional.of(result.getIntegrityMeta().getSha1()).filter(StringUtils::isNotBlank).ifPresent(persistentIntegrityMetaComponent::setSha1);
Optional.of(result.getIntegrityMeta().getSha256()).filter(StringUtils::isNotBlank).ifPresent(persistentIntegrityMetaComponent::setSha256);
Optional.of(result.getIntegrityMeta().getSha512()).filter(StringUtils::isNotBlank).ifPresent(persistentIntegrityMetaComponent::setSha512);
persistentIntegrityMetaComponent.setPurl(result.getComponent().getPurl());
persistentIntegrityMetaComponent.setRepositoryUrl(result.getIntegrityMeta().getMetaSourceUrl());
persistentIntegrityMetaComponent.setPublishedAt(result.getIntegrityMeta().hasCurrentVersionLastModified() ? new Date(result.getIntegrityMeta().getCurrentVersionLastModified().getSeconds() * 1000) : null);
Expand All @@ -198,7 +177,7 @@ private IntegrityMetaComponent synchronizeIntegrityMetaResult(final Record<Strin
return queryManager.updateIntegrityMetaComponent(persistentIntegrityMetaComponent);
}

private static boolean isRecordValid(final Record<String, AnalysisResult> record) {
private static boolean isRecordValid(final ConsumerRecord<String, AnalysisResult> record) {
final AnalysisResult result = record.value();
if (!result.hasComponent()) {
LOGGER.warn("""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.dependencytrack.event.ProjectPolicyEvaluationEvent;
import org.dependencytrack.event.kafka.KafkaTopics;
import org.dependencytrack.event.kafka.streams.processor.DelayedBomProcessedNotificationProcessor;
import org.dependencytrack.event.kafka.streams.processor.RepositoryMetaResultProcessor;
import org.dependencytrack.event.kafka.streams.processor.VulnerabilityScanResultProcessor;
import org.dependencytrack.model.VulnerabilityScan;
import org.dependencytrack.model.WorkflowState;
Expand Down Expand Up @@ -235,12 +234,6 @@ Topology createTopology() {
Event.dispatch(policyEvaluationEvent);
}, Named.as("trigger_policy_evaluation"));

streamsBuilder
.stream(KafkaTopics.REPO_META_ANALYSIS_RESULT.name(),
Consumed.with(KafkaTopics.REPO_META_ANALYSIS_RESULT.keySerde(), KafkaTopics.REPO_META_ANALYSIS_RESULT.valueSerde())
.withName("consume_from_%s_topic".formatted(KafkaTopics.REPO_META_ANALYSIS_RESULT.name())))
.process(RepositoryMetaResultProcessor::new, Named.as("process_repo_meta_analysis_result"));

return streamsBuilder.build(streamsProperties);
}

Expand Down
12 changes: 12 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,18 @@ alpine.kafka.processor.vuln.mirror.retry.max.delay.ms=180000
alpine.kafka.processor.vuln.mirror.consumer.group.id=dtrack-apiserver-processor
alpine.kafka.processor.vuln.mirror.consumer.auto.offset.reset=earliest

# Required
# Configures the Kafka processor responsible for ingesting repository metadata
# analysis results from the dtrack.repo-meta-analysis.result topic.
alpine.kafka.processor.repo.meta.analysis.result.max.concurrency=-1
alpine.kafka.processor.repo.meta.analysis.result.processing.order=key
alpine.kafka.processor.repo.meta.analysis.result.retry.initial.delay.ms=1000
alpine.kafka.processor.repo.meta.analysis.result.retry.multiplier=2
alpine.kafka.processor.repo.meta.analysis.result.retry.randomization.factor=0.3
alpine.kafka.processor.repo.meta.analysis.result.retry.max.delay.ms=180000
alpine.kafka.processor.repo.meta.analysis.result.consumer.group.id=dtrack-apiserver-processor
alpine.kafka.processor.repo.meta.analysis.result.consumer.auto.offset.reset=earliest

# Scheduling tasks after 3 minutes (3*60*1000) of starting application
task.scheduler.initial.delay=180000

Expand Down
Loading
Loading