3
3
import alpine .common .logging .Logger ;
4
4
import com .github .packageurl .MalformedPackageURLException ;
5
5
import com .github .packageurl .PackageURL ;
6
- import org .apache .commons .lang3 .exception . ExceptionUtils ;
6
+ import org .apache .commons .lang3 .StringUtils ;
7
7
import org .apache .kafka .clients .consumer .ConsumerRecord ;
8
8
import org .dependencytrack .event .kafka .processor .api .Processor ;
9
9
import org .dependencytrack .event .kafka .processor .exception .ProcessingException ;
13
13
import org .dependencytrack .model .RepositoryType ;
14
14
import org .dependencytrack .persistence .QueryManager ;
15
15
import org .dependencytrack .proto .repometaanalysis .v1 .AnalysisResult ;
16
- import org .postgresql .util .PSQLState ;
16
+ import org .dependencytrack .util .PersistenceUtil ;
17
17
18
- import javax .jdo .JDODataStoreException ;
19
18
import javax .jdo .PersistenceManager ;
20
19
import javax .jdo .Query ;
21
- import javax .jdo .Transaction ;
22
- import java .sql .SQLException ;
23
20
import java .util .Date ;
24
21
import java .util .Optional ;
25
22
@@ -61,83 +58,70 @@ private IntegrityMetaComponent synchronizeIntegrityMetadata(final QueryManager q
61
58
}
62
59
}
63
60
64
- private void synchronizeRepositoryMetadata (final QueryManager queryManager , final ConsumerRecord <String , AnalysisResult > record ) throws Exception {
65
- PersistenceManager pm = queryManager .getPersistenceManager ();
61
+ private void synchronizeRepositoryMetadata (final QueryManager qm , final ConsumerRecord <String , AnalysisResult > record ) throws Exception {
62
+ final PersistenceManager pm = qm .getPersistenceManager ();
66
63
final AnalysisResult result = record .value ();
67
- PackageURL purl = new PackageURL (result .getComponent ().getPurl ());
64
+ final var purl = new PackageURL (result .getComponent ().getPurl ());
68
65
69
66
// It is possible that the same meta info is reported for multiple components in parallel,
70
67
// causing unique constraint violations when attempting to insert into the REPOSITORY_META_COMPONENT table.
71
68
// In such cases, we can get away with simply retrying to SELECT+UPDATE or INSERT again. We'll attempt
72
69
// up to 3 times before giving up.
73
- for (int i = 0 ; i < 3 ; i ++) {
74
- final Transaction trx = pm .currentTransaction ();
75
- try {
76
- RepositoryMetaComponent repositoryMetaComponentResult = createRepositoryMetaResult (record , pm , purl );
77
- if (repositoryMetaComponentResult != null ) {
78
- trx .begin ();
79
- pm .makePersistent (repositoryMetaComponentResult );
80
- trx .commit ();
81
- break ; // this means that transaction was successful and we do not need to retry
82
- }
83
- } catch (JDODataStoreException e ) {
84
- // TODO: DataNucleus doesn't map constraint violation exceptions very well,
85
- // so we have to depend on the exception of the underlying JDBC driver to
86
- // tell us what happened. We currently only handle PostgreSQL, but we'll have
87
- // to do the same for at least H2 and MSSQL.
88
- if (ExceptionUtils .getRootCause (e ) instanceof final SQLException se
89
- && PSQLState .UNIQUE_VIOLATION .getState ().equals (se .getSQLState ())) {
90
- continue ; // Retry
91
- }
92
-
93
- throw e ;
94
- } finally {
95
- if (trx .isActive ()) {
96
- trx .rollback ();
97
- }
70
+ qm .runInRetryableTransaction (() -> {
71
+ final RepositoryMetaComponent repositoryMetaComponentResult = createRepositoryMetaResult (record , pm , purl );
72
+ if (repositoryMetaComponentResult != null ) {
73
+ pm .makePersistent (repositoryMetaComponentResult );
98
74
}
99
- }
75
+
76
+ return null ;
77
+ }, PersistenceUtil ::isUniqueConstraintViolation );
100
78
}
101
79
102
- private RepositoryMetaComponent createRepositoryMetaResult (ConsumerRecord <String , AnalysisResult > incomingAnalysisResultRecord , PersistenceManager pm , PackageURL purl ) throws Exception {
80
+ private RepositoryMetaComponent createRepositoryMetaResult (ConsumerRecord <String , AnalysisResult > incomingAnalysisResultRecord , PersistenceManager pm , PackageURL purl ) {
103
81
final AnalysisResult result = incomingAnalysisResultRecord .value ();
104
- if (result .hasLatestVersion ()) {
105
- try (final Query <RepositoryMetaComponent > query = pm .newQuery (RepositoryMetaComponent .class )) {
106
- query .setFilter ("repositoryType == :repositoryType && namespace == :namespace && name == :name" );
107
- query .setParameters (
108
- RepositoryType .resolve (purl ),
109
- purl .getNamespace (),
110
- purl .getName ()
111
- );
112
- RepositoryMetaComponent persistentRepoMetaComponent = query .executeUnique ();
113
- if (persistentRepoMetaComponent == null ) {
114
- persistentRepoMetaComponent = new RepositoryMetaComponent ();
115
- }
116
-
117
- if (persistentRepoMetaComponent .getLastCheck () != null
118
- && persistentRepoMetaComponent .getLastCheck ().after (new Date (incomingAnalysisResultRecord .timestamp ()))) {
119
- LOGGER .warn ("""
120
- Received repository meta information for %s that is older\s
121
- than what's already in the database; Discarding
122
- """ .formatted (purl ));
123
- return null ;
124
- }
125
-
126
- persistentRepoMetaComponent .setRepositoryType (RepositoryType .resolve (purl ));
127
- persistentRepoMetaComponent .setNamespace (purl .getNamespace ());
128
- persistentRepoMetaComponent .setName (purl .getName ());
129
- if (result .hasLatestVersion ()) {
130
- persistentRepoMetaComponent .setLatestVersion (result .getLatestVersion ());
131
- }
132
- if (result .hasPublished ()) {
133
- persistentRepoMetaComponent .setPublished (new Date (result .getPublished ().getSeconds () * 1000 ));
134
- }
135
- persistentRepoMetaComponent .setLastCheck (new Date (incomingAnalysisResultRecord .timestamp ()));
136
- return persistentRepoMetaComponent ;
137
- }
138
- } else {
82
+ if (!result .hasLatestVersion ()) {
83
+ return null ;
84
+ }
85
+
86
+ final Query <RepositoryMetaComponent > query = pm .newQuery (RepositoryMetaComponent .class );
87
+ query .setFilter ("repositoryType == :repositoryType && namespace == :namespace && name == :name" );
88
+ query .setParameters (
89
+ RepositoryType .resolve (purl ),
90
+ purl .getNamespace (),
91
+ purl .getName ()
92
+ );
93
+
94
+ RepositoryMetaComponent persistentRepoMetaComponent ;
95
+ try {
96
+ persistentRepoMetaComponent = query .executeUnique ();
97
+ } finally {
98
+ query .closeAll ();
99
+ }
100
+
101
+ if (persistentRepoMetaComponent == null ) {
102
+ persistentRepoMetaComponent = new RepositoryMetaComponent ();
103
+ }
104
+
105
+ if (persistentRepoMetaComponent .getLastCheck () != null
106
+ && persistentRepoMetaComponent .getLastCheck ().after (new Date (incomingAnalysisResultRecord .timestamp ()))) {
107
+ LOGGER .warn ("""
108
+ Received repository meta information for %s that is older\s
109
+ than what's already in the database; Discarding
110
+ """ .formatted (purl ));
139
111
return null ;
140
112
}
113
+
114
+ persistentRepoMetaComponent .setRepositoryType (RepositoryType .resolve (purl ));
115
+ persistentRepoMetaComponent .setNamespace (purl .getNamespace ());
116
+ persistentRepoMetaComponent .setName (purl .getName ());
117
+ if (result .hasLatestVersion ()) {
118
+ persistentRepoMetaComponent .setLatestVersion (result .getLatestVersion ());
119
+ }
120
+ if (result .hasPublished ()) {
121
+ persistentRepoMetaComponent .setPublished (new Date (result .getPublished ().getSeconds () * 1000 ));
122
+ }
123
+ persistentRepoMetaComponent .setLastCheck (new Date (incomingAnalysisResultRecord .timestamp ()));
124
+ return persistentRepoMetaComponent ;
141
125
}
142
126
143
127
private IntegrityMetaComponent synchronizeIntegrityMetaResult (final ConsumerRecord <String , AnalysisResult > incomingAnalysisResultRecord , QueryManager queryManager , PackageURL purl ) {
@@ -155,10 +139,10 @@ private IntegrityMetaComponent synchronizeIntegrityMetaResult(final ConsumerReco
155
139
156
140
if (result .getIntegrityMeta ().hasMd5 () || result .getIntegrityMeta ().hasSha1 () || result .getIntegrityMeta ().hasSha256 ()
157
141
|| result .getIntegrityMeta ().hasSha512 () || result .getIntegrityMeta ().hasCurrentVersionLastModified ()) {
158
- Optional .ofNullable (result .getIntegrityMeta ().getMd5 ()).ifPresent (persistentIntegrityMetaComponent ::setMd5 );
159
- Optional .ofNullable (result .getIntegrityMeta ().getSha1 ()).ifPresent (persistentIntegrityMetaComponent ::setSha1 );
160
- Optional .ofNullable (result .getIntegrityMeta ().getSha256 ()).ifPresent (persistentIntegrityMetaComponent ::setSha256 );
161
- Optional .ofNullable (result .getIntegrityMeta ().getSha512 ()).ifPresent (persistentIntegrityMetaComponent ::setSha512 );
142
+ Optional .of (result .getIntegrityMeta ().getMd5 ()). filter ( StringUtils :: isNotBlank ).ifPresent (persistentIntegrityMetaComponent ::setMd5 );
143
+ Optional .of (result .getIntegrityMeta ().getSha1 ()). filter ( StringUtils :: isNotBlank ).ifPresent (persistentIntegrityMetaComponent ::setSha1 );
144
+ Optional .of (result .getIntegrityMeta ().getSha256 ()). filter ( StringUtils :: isNotBlank ).ifPresent (persistentIntegrityMetaComponent ::setSha256 );
145
+ Optional .of (result .getIntegrityMeta ().getSha512 ()). filter ( StringUtils :: isNotBlank ).ifPresent (persistentIntegrityMetaComponent ::setSha512 );
162
146
persistentIntegrityMetaComponent .setPurl (result .getComponent ().getPurl ());
163
147
persistentIntegrityMetaComponent .setRepositoryUrl (result .getIntegrityMeta ().getMetaSourceUrl ());
164
148
persistentIntegrityMetaComponent .setPublishedAt (result .getIntegrityMeta ().hasCurrentVersionLastModified () ? new Date (result .getIntegrityMeta ().getCurrentVersionLastModified ().getSeconds () * 1000 ) : null );
0 commit comments