21
21
import alpine .common .logging .Logger ;
22
22
import com .github .packageurl .MalformedPackageURLException ;
23
23
import com .github .packageurl .PackageURL ;
24
- import org .apache .commons .lang3 .exception . ExceptionUtils ;
24
+ import org .apache .commons .lang3 .StringUtils ;
25
25
import org .apache .kafka .clients .consumer .ConsumerRecord ;
26
26
import org .dependencytrack .event .kafka .processor .api .Processor ;
27
27
import org .dependencytrack .event .kafka .processor .exception .ProcessingException ;
31
31
import org .dependencytrack .model .RepositoryType ;
32
32
import org .dependencytrack .persistence .QueryManager ;
33
33
import org .dependencytrack .proto .repometaanalysis .v1 .AnalysisResult ;
34
- import org .postgresql .util .PSQLState ;
34
+ import org .dependencytrack .util .PersistenceUtil ;
35
35
36
- import javax .jdo .JDODataStoreException ;
37
36
import javax .jdo .PersistenceManager ;
38
37
import javax .jdo .Query ;
39
- import javax .jdo .Transaction ;
40
- import java .sql .SQLException ;
41
38
import java .util .Date ;
42
39
import java .util .Optional ;
43
40
@@ -79,83 +76,70 @@ private IntegrityMetaComponent synchronizeIntegrityMetadata(final QueryManager q
79
76
}
80
77
}
81
78
82
- private void synchronizeRepositoryMetadata (final QueryManager queryManager , final ConsumerRecord <String , AnalysisResult > record ) throws Exception {
83
- PersistenceManager pm = queryManager .getPersistenceManager ();
79
+ private void synchronizeRepositoryMetadata (final QueryManager qm , final ConsumerRecord <String , AnalysisResult > record ) throws Exception {
80
+ final PersistenceManager pm = qm .getPersistenceManager ();
84
81
final AnalysisResult result = record .value ();
85
- PackageURL purl = new PackageURL (result .getComponent ().getPurl ());
82
+ final var purl = new PackageURL (result .getComponent ().getPurl ());
86
83
87
84
// It is possible that the same meta info is reported for multiple components in parallel,
88
85
// causing unique constraint violations when attempting to insert into the REPOSITORY_META_COMPONENT table.
89
86
// In such cases, we can get away with simply retrying to SELECT+UPDATE or INSERT again. We'll attempt
90
87
// up to 3 times before giving up.
91
- for (int i = 0 ; i < 3 ; i ++) {
92
- final Transaction trx = pm .currentTransaction ();
93
- try {
94
- RepositoryMetaComponent repositoryMetaComponentResult = createRepositoryMetaResult (record , pm , purl );
95
- if (repositoryMetaComponentResult != null ) {
96
- trx .begin ();
97
- pm .makePersistent (repositoryMetaComponentResult );
98
- trx .commit ();
99
- break ; // this means that transaction was successful and we do not need to retry
100
- }
101
- } catch (JDODataStoreException e ) {
102
- // TODO: DataNucleus doesn't map constraint violation exceptions very well,
103
- // so we have to depend on the exception of the underlying JDBC driver to
104
- // tell us what happened. We currently only handle PostgreSQL, but we'll have
105
- // to do the same for at least H2 and MSSQL.
106
- if (ExceptionUtils .getRootCause (e ) instanceof final SQLException se
107
- && PSQLState .UNIQUE_VIOLATION .getState ().equals (se .getSQLState ())) {
108
- continue ; // Retry
109
- }
110
-
111
- throw e ;
112
- } finally {
113
- if (trx .isActive ()) {
114
- trx .rollback ();
115
- }
88
+ qm .runInRetryableTransaction (() -> {
89
+ final RepositoryMetaComponent repositoryMetaComponentResult = createRepositoryMetaResult (record , pm , purl );
90
+ if (repositoryMetaComponentResult != null ) {
91
+ pm .makePersistent (repositoryMetaComponentResult );
116
92
}
117
- }
93
+
94
+ return null ;
95
+ }, PersistenceUtil ::isUniqueConstraintViolation );
118
96
}
119
97
120
- private RepositoryMetaComponent createRepositoryMetaResult (ConsumerRecord <String , AnalysisResult > incomingAnalysisResultRecord , PersistenceManager pm , PackageURL purl ) throws Exception {
98
+ private RepositoryMetaComponent createRepositoryMetaResult (ConsumerRecord <String , AnalysisResult > incomingAnalysisResultRecord , PersistenceManager pm , PackageURL purl ) {
121
99
final AnalysisResult result = incomingAnalysisResultRecord .value ();
122
- if (result .hasLatestVersion ()) {
123
- try (final Query <RepositoryMetaComponent > query = pm .newQuery (RepositoryMetaComponent .class )) {
124
- query .setFilter ("repositoryType == :repositoryType && namespace == :namespace && name == :name" );
125
- query .setParameters (
126
- RepositoryType .resolve (purl ),
127
- purl .getNamespace (),
128
- purl .getName ()
129
- );
130
- RepositoryMetaComponent persistentRepoMetaComponent = query .executeUnique ();
131
- if (persistentRepoMetaComponent == null ) {
132
- persistentRepoMetaComponent = new RepositoryMetaComponent ();
133
- }
134
-
135
- if (persistentRepoMetaComponent .getLastCheck () != null
136
- && persistentRepoMetaComponent .getLastCheck ().after (new Date (incomingAnalysisResultRecord .timestamp ()))) {
137
- LOGGER .warn ("""
138
- Received repository meta information for %s that is older\s
139
- than what's already in the database; Discarding
140
- """ .formatted (purl ));
141
- return null ;
142
- }
143
-
144
- persistentRepoMetaComponent .setRepositoryType (RepositoryType .resolve (purl ));
145
- persistentRepoMetaComponent .setNamespace (purl .getNamespace ());
146
- persistentRepoMetaComponent .setName (purl .getName ());
147
- if (result .hasLatestVersion ()) {
148
- persistentRepoMetaComponent .setLatestVersion (result .getLatestVersion ());
149
- }
150
- if (result .hasPublished ()) {
151
- persistentRepoMetaComponent .setPublished (new Date (result .getPublished ().getSeconds () * 1000 ));
152
- }
153
- persistentRepoMetaComponent .setLastCheck (new Date (incomingAnalysisResultRecord .timestamp ()));
154
- return persistentRepoMetaComponent ;
155
- }
156
- } else {
100
+ if (!result .hasLatestVersion ()) {
101
+ return null ;
102
+ }
103
+
104
+ final Query <RepositoryMetaComponent > query = pm .newQuery (RepositoryMetaComponent .class );
105
+ query .setFilter ("repositoryType == :repositoryType && namespace == :namespace && name == :name" );
106
+ query .setParameters (
107
+ RepositoryType .resolve (purl ),
108
+ purl .getNamespace (),
109
+ purl .getName ()
110
+ );
111
+
112
+ RepositoryMetaComponent persistentRepoMetaComponent ;
113
+ try {
114
+ persistentRepoMetaComponent = query .executeUnique ();
115
+ } finally {
116
+ query .closeAll ();
117
+ }
118
+
119
+ if (persistentRepoMetaComponent == null ) {
120
+ persistentRepoMetaComponent = new RepositoryMetaComponent ();
121
+ }
122
+
123
+ if (persistentRepoMetaComponent .getLastCheck () != null
124
+ && persistentRepoMetaComponent .getLastCheck ().after (new Date (incomingAnalysisResultRecord .timestamp ()))) {
125
+ LOGGER .warn ("""
126
+ Received repository meta information for %s that is older\s
127
+ than what's already in the database; Discarding
128
+ """ .formatted (purl ));
157
129
return null ;
158
130
}
131
+
132
+ persistentRepoMetaComponent .setRepositoryType (RepositoryType .resolve (purl ));
133
+ persistentRepoMetaComponent .setNamespace (purl .getNamespace ());
134
+ persistentRepoMetaComponent .setName (purl .getName ());
135
+ if (result .hasLatestVersion ()) {
136
+ persistentRepoMetaComponent .setLatestVersion (result .getLatestVersion ());
137
+ }
138
+ if (result .hasPublished ()) {
139
+ persistentRepoMetaComponent .setPublished (new Date (result .getPublished ().getSeconds () * 1000 ));
140
+ }
141
+ persistentRepoMetaComponent .setLastCheck (new Date (incomingAnalysisResultRecord .timestamp ()));
142
+ return persistentRepoMetaComponent ;
159
143
}
160
144
161
145
private IntegrityMetaComponent synchronizeIntegrityMetaResult (final ConsumerRecord <String , AnalysisResult > incomingAnalysisResultRecord , QueryManager queryManager , PackageURL purl ) {
@@ -173,10 +157,10 @@ private IntegrityMetaComponent synchronizeIntegrityMetaResult(final ConsumerReco
173
157
174
158
if (result .getIntegrityMeta ().hasMd5 () || result .getIntegrityMeta ().hasSha1 () || result .getIntegrityMeta ().hasSha256 ()
175
159
|| result .getIntegrityMeta ().hasSha512 () || result .getIntegrityMeta ().hasCurrentVersionLastModified ()) {
176
- Optional .ofNullable (result .getIntegrityMeta ().getMd5 ()).ifPresent (persistentIntegrityMetaComponent ::setMd5 );
177
- Optional .ofNullable (result .getIntegrityMeta ().getSha1 ()).ifPresent (persistentIntegrityMetaComponent ::setSha1 );
178
- Optional .ofNullable (result .getIntegrityMeta ().getSha256 ()).ifPresent (persistentIntegrityMetaComponent ::setSha256 );
179
- Optional .ofNullable (result .getIntegrityMeta ().getSha512 ()).ifPresent (persistentIntegrityMetaComponent ::setSha512 );
160
+ Optional .of (result .getIntegrityMeta ().getMd5 ()). filter ( StringUtils :: isNotBlank ).ifPresent (persistentIntegrityMetaComponent ::setMd5 );
161
+ Optional .of (result .getIntegrityMeta ().getSha1 ()). filter ( StringUtils :: isNotBlank ).ifPresent (persistentIntegrityMetaComponent ::setSha1 );
162
+ Optional .of (result .getIntegrityMeta ().getSha256 ()). filter ( StringUtils :: isNotBlank ).ifPresent (persistentIntegrityMetaComponent ::setSha256 );
163
+ Optional .of (result .getIntegrityMeta ().getSha512 ()). filter ( StringUtils :: isNotBlank ).ifPresent (persistentIntegrityMetaComponent ::setSha512 );
180
164
persistentIntegrityMetaComponent .setPurl (result .getComponent ().getPurl ());
181
165
persistentIntegrityMetaComponent .setRepositoryUrl (result .getIntegrityMeta ().getMetaSourceUrl ());
182
166
persistentIntegrityMetaComponent .setPublishedAt (result .getIntegrityMeta ().hasCurrentVersionLastModified () ? new Date (result .getIntegrityMeta ().getCurrentVersionLastModified ().getSeconds () * 1000 ) : null );
0 commit comments