Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace manual transaction commits with callInTransaction #815

Merged
merged 1 commit into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,17 +192,10 @@ public List<DependencyMetrics> getDependencyMetricsSince(Component component, Da
* Synchronizes VulnerabilityMetrics.
*/
public void synchronizeVulnerabilityMetrics(List<VulnerabilityMetrics> metrics) {
pm.currentTransaction().begin();
// No need for complex updating, just replace the existing ~400 rows with new ones
// Unless we have a contract with clients that the ID of metric records cannot change?

final Query<VulnerabilityMetrics> delete = pm.newQuery("DELETE FROM org.dependencytrack.model.VulnerabilityMetrics");
delete.execute();

// This still does ~400 queries, probably because not all databases can do bulk insert with autogenerated PKs
// Or because Datanucleus is trying to be smart as it wants to cache all these instances
pm.makePersistentAll(metrics);
pm.currentTransaction().commit();
runInTransaction(() -> {
pm.newQuery("DELETE FROM org.dependencytrack.model.VulnerabilityMetrics").execute();
pm.makePersistentAll(metrics);
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,16 @@ public NotificationPublisher getDefaultNotificationPublisherByName(final String
public NotificationPublisher createNotificationPublisher(final String name, final String description,
final String publisherClass, final String templateContent,
final String templateMimeType, final boolean defaultPublisher) {
pm.currentTransaction().begin();
final NotificationPublisher publisher = new NotificationPublisher();
publisher.setName(name);
publisher.setDescription(description);
publisher.setPublisherClass(publisherClass);
publisher.setTemplate(templateContent);
publisher.setTemplateMimeType(templateMimeType);
publisher.setDefaultPublisher(defaultPublisher);
pm.makePersistent(publisher);
pm.currentTransaction().commit();
pm.getFetchPlan().addGroup(NotificationPublisher.FetchGroup.ALL.name());
return getObjectById(NotificationPublisher.class, publisher.getId());
return callInTransaction(() -> {
final NotificationPublisher publisher = new NotificationPublisher();
publisher.setName(name);
publisher.setDescription(description);
publisher.setPublisherClass(publisherClass);
publisher.setTemplate(templateContent);
publisher.setTemplateMimeType(templateMimeType);
publisher.setDefaultPublisher(defaultPublisher);
return pm.makePersistent(publisher);
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,25 +775,26 @@ public List<ProjectProperty> getProjectProperties(final Project project) {
* @param project a Project object
* @param tags a List of Tag objects
*/
@SuppressWarnings("unchecked")
@Override
public void bind(Project project, List<Tag> tags) {
final Query<Tag> query = pm.newQuery(Tag.class, "projects.contains(:project)");
final List<Tag> currentProjectTags = (List<Tag>) query.execute(project);
pm.currentTransaction().begin();
for (final Tag tag : currentProjectTags) {
if (!tags.contains(tag)) {
tag.getProjects().remove(project);
runInTransaction(() -> {
final Query<Tag> query = pm.newQuery(Tag.class, "projects.contains(:project)");
query.setParameters(project);
final List<Tag> currentProjectTags = executeAndCloseList(query);

for (final Tag tag : currentProjectTags) {
if (!tags.contains(tag)) {
tag.getProjects().remove(project);
}
}
}
project.setTags(tags);
for (final Tag tag : tags) {
final List<Project> projects = tag.getProjects();
if (!projects.contains(project)) {
projects.add(project);
project.setTags(tags);
for (final Tag tag : tags) {
final List<Project> projects = tag.getProjects();
if (!projects.contains(project)) {
projects.add(project);
}
}
}
pm.currentTransaction().commit();
});
}

/**
Expand Down
72 changes: 16 additions & 56 deletions src/main/java/org/dependencytrack/persistence/QueryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import alpine.persistence.AlpineQueryManager;
import alpine.persistence.OrderDirection;
import alpine.persistence.PaginatedResult;
import alpine.persistence.ScopedCustomization;
import alpine.resources.AlpineRequest;
import alpine.server.util.DbUtil;
import com.github.packageurl.PackageURL;
Expand Down Expand Up @@ -117,9 +118,10 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.function.Predicate;
import java.util.function.Supplier;

import static org.datanucleus.PropertyNames.PROPERTY_QUERY_SQL_ALLOWALL;
import static org.dependencytrack.proto.vulnanalysis.v1.ScanStatus.SCAN_STATUS_FAILED;

/**
Expand Down Expand Up @@ -1483,70 +1485,28 @@ public <T> T getObjectByUuid(final Class<T> clazz, final UUID uuid, final List<S
}
}

/**
* Convenience method to execute a given {@link Runnable} within the context of a {@link Transaction}.
* <p>
* Eventually, this may be moved to {@link alpine.persistence.AbstractAlpineQueryManager}.
*
* @param runnable The {@link Runnable} to execute
* @since 4.6.0
*/
public void runInTransaction(final Runnable runnable) {
final Transaction trx = pm.currentTransaction();
try {
trx.begin();
runnable.run();
trx.commit();
} finally {
if (trx.isActive()) {
trx.rollback();
}
}
}

/**
* Convenience method to execute a given {@link Supplier} within the context of a {@link Transaction}.
*
* @param supplier The {@link Supplier} to execute
* @param <T> Type of the result of {@code supplier}
* @return The result of the execution of {@code supplier}
*/
public <T> T runInTransaction(final Supplier<T> supplier) {
final Transaction trx = pm.currentTransaction();
try {
trx.begin();
final T result = supplier.get();
trx.commit();
return result;
} finally {
if (trx.isActive()) {
trx.rollback();
}
}
}

public <T> T runInRetryableTransaction(final Supplier<T> supplier, final Predicate<Throwable> retryOn) {
public <T> T runInRetryableTransaction(final Callable<T> supplier, final Predicate<Throwable> retryOn) {
final var retryConfig = RetryConfig.custom()
.retryOnException(retryOn)
.maxAttempts(3)
.build();

return Retry.of("runInRetryableTransaction", retryConfig)
.executeSupplier(() -> runInTransaction(supplier));
.executeSupplier(() -> callInTransaction(supplier));
}

public void recursivelyDeleteTeam(Team team) {
pm.setProperty("datanucleus.query.sql.allowAll", true);
final Transaction trx = pm.currentTransaction();
pm.currentTransaction().begin();
pm.deletePersistentAll(team.getApiKeys());
String aclDeleteQuery = """
DELETE FROM "PROJECT_ACCESS_TEAMS" WHERE "TEAM_ID" = ?
""";
final Query<?> query = pm.newQuery(JDOQuery.SQL_QUERY_LANGUAGE, aclDeleteQuery);
query.executeWithArray(team.getId());
pm.deletePersistent(team);
pm.currentTransaction().commit();
runInTransaction(() -> {
pm.deletePersistentAll(team.getApiKeys());

try (var ignored = new ScopedCustomization(pm).withProperty(PROPERTY_QUERY_SQL_ALLOWALL, "true")) {
final Query<?> aclDeleteQuery = pm.newQuery(JDOQuery.SQL_QUERY_LANGUAGE, """
DELETE FROM "PROJECT_ACCESS_TEAMS" WHERE "PROJECT_ACCESS_TEAMS"."TEAM_ID" = ?""");
executeAndCloseWithArray(aclDeleteQuery, team.getId());
}

pm.deletePersistent(team);
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,15 +246,16 @@ public void addVulnerability(Vulnerability vulnerability, Component component, A
* @param component the component unaffected by the vulnerabiity
*/
public void removeVulnerability(Vulnerability vulnerability, Component component) {
if (contains(vulnerability, component)) {
pm.currentTransaction().begin();
component.removeVulnerability(vulnerability);
pm.currentTransaction().commit();
}
final FindingAttribution fa = getFindingAttribution(vulnerability, component);
if (fa != null) {
delete(fa);
}
runInTransaction(() -> {
if (contains(vulnerability, component)) {
component.removeVulnerability(vulnerability);
}

final FindingAttribution fa = getFindingAttribution(vulnerability, component);
if (fa != null) {
delete(fa);
}
});
}

/**
Expand Down Expand Up @@ -640,7 +641,7 @@ SELECT COUNT(DISTINCT this.project.id)
}

public synchronized VulnerabilityAlias synchronizeVulnerabilityAlias(final VulnerabilityAlias alias) {
return runInTransaction(() -> {
return callInTransaction(() -> {
// Query existing aliases that match AT LEAST ONE identifier of the given alias.
//
// For each data source, we want to know the existing aliases where the respective identifier either:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ public Response cloneProject(CloneProjectRequest jsonRequest) {
}
LOGGER.info("Project " + sourceProject + " is being cloned by " + super.getPrincipal().getName());
CloneProjectEvent event = new CloneProjectEvent(jsonRequest);
final Response response = qm.runInTransaction(() -> {
final Response response = qm.callInTransaction(() -> {
WorkflowState workflowState = qm.getWorkflowStateByTokenAndStep(event.getChainIdentifier(), WorkflowStep.PROJECT_CLONE);
if (workflowState != null) {
if (isEventBeingProcessed(event.getChainIdentifier()) || !workflowState.getStatus().isTerminal()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public Response updateApiKeyComment(@PathParam("apikey") final String apikey,
try (final var qm = new QueryManager()) {
qm.getPersistenceManager().setProperty(PROPERTY_RETAIN_VALUES, "true");

return qm.runInTransaction(() -> {
return qm.callInTransaction(() -> {
final ApiKey apiKey = qm.getApiKey(apikey);
if (apiKey == null) {
return Response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public Response triggerVulnerabilityPolicyBundleSync() {
qm.getPersistenceManager().currentTransaction().setSerializeRead(true);

final UUID token = VulnerabilityPolicyFetchEvent.CHAIN_IDENTIFIER;
final Response response = qm.runInTransaction(() -> {
final Response response = qm.callInTransaction(() -> {
WorkflowState workflowState = qm.getWorkflowStateByTokenAndStep(token, WorkflowStep.POLICY_BUNDLE_SYNC);
if (workflowState != null) {
if (isEventBeingProcessed(token) || !workflowState.getStatus().isTerminal()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ private ProcessedBom processBom(final Context ctx, final ConsumedBom bom) {
// See https://www.datanucleus.org/products/accessplatform_6_0/jdo/persistence.html#lifecycle
qm.getPersistenceManager().setProperty(PROPERTY_RETAIN_VALUES, "true");

return qm.runInTransaction(() -> {
return qm.callInTransaction(() -> {
final Project persistentProject = processProject(ctx, qm, bom.project(), bom.projectMetadata());

LOGGER.info("Processing %d components".formatted(bom.components().size()));
Expand Down Expand Up @@ -1031,7 +1031,7 @@ private static List<ComponentRepositoryMetaAnalysisEvent> createRepoMetaAnalysis
continue;
}

final boolean shouldFetchIntegrityData = qm.runInTransaction(() -> prepareIntegrityMetaComponent(qm, component));
final boolean shouldFetchIntegrityData = qm.callInTransaction(() -> prepareIntegrityMetaComponent(qm, component));
if (shouldFetchIntegrityData) {
events.add(new ComponentRepositoryMetaAnalysisEvent(
component.getUuid(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private static void transitionTimedOutStepsToFailed(final QueryManager qm, final
int stepsCancelled = 0;
try {
for (final WorkflowState state : failedQuery.executeList()) {
stepsCancelled += qm.runInTransaction(() -> {
stepsCancelled += qm.callInTransaction(() -> {
final Date now = new Date();
state.setStatus(WorkflowStatus.FAILED);
state.setFailureReason("Timed out");
Expand Down Expand Up @@ -207,7 +207,7 @@ HAVING max(updatedAt) < :cutoff
).isEmpty()
""");
try {
stepsDeleted += qm.runInTransaction(() -> (long) workflowDeleteQuery.executeWithMap(Map.of(
stepsDeleted += qm.callInTransaction(() -> (long) workflowDeleteQuery.executeWithMap(Map.of(
"tokens", tokenBatch,
"nonTerminalStatuses", Set.of(WorkflowStatus.PENDING, WorkflowStatus.TIMED_OUT)
)));
Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true">
<statusListener class="ch.qos.logback.core.status.NopStatusListener"/>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date %level [%logger{0}] %msg%replace( [%mdc{}]){' \[\]', ''}%n</pattern>
Expand Down