Skip to content

Commit e5d5c8d

Browse files
committed
db: Add stream update support for vulns
Signed-off-by: J. Victor Martins <[email protected]>
1 parent 5ae53b8 commit e5d5c8d

File tree

3 files changed

+73
-16
lines changed

3 files changed

+73
-16
lines changed

datastore/matcher_store.go

+6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
package datastore
22

3+
// Iter is an iterator function that accepts a callback 'yield' to handle each
4+
// iterator item. The consumer can signal the iterator to break or retry by
5+
// returning an error. The iterator itself returns an error if the iteration
6+
// cannot continue or was interrupted unexpectedly.
7+
type Iter[T any] func(yield func(T, error) bool)
8+
39
// MatcherStore aggregates all interface types
410
type MatcherStore interface {
511
Updater

datastore/postgres/updatevulnerabilities.go

+61-16
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/quay/zlog"
1616

1717
"github.com/quay/claircore"
18+
"github.com/quay/claircore/datastore"
1819
"github.com/quay/claircore/libvuln/driver"
1920
"github.com/quay/claircore/pkg/microbatch"
2021
)
@@ -45,14 +46,27 @@ var (
4546
)
4647
)
4748

49+
// UpdateVulnerabilitiesIter implements vulnstore.Updater.
50+
func (s *MatcherStore) UpdateVulnerabilitiesIter(ctx context.Context, updater string, fp driver.Fingerprint, it datastore.VulnerabilityIter) (uuid.UUID, error) {
51+
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateVulnerabilitiesIter")
52+
return s.updateVulnerabilities(ctx, updater, fp, it, nil)
53+
}
54+
4855
// UpdateVulnerabilities implements vulnstore.Updater.
4956
//
5057
// It creates a new UpdateOperation for this update call, inserts the
5158
// provided vulnerabilities and computes a diff comprising the removed
5259
// and added vulnerabilities for this UpdateOperation.
53-
func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) {
60+
func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string, fp driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) {
5461
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateVulnerabilities")
55-
return s.updateVulnerabilities(ctx, updater, fingerprint, vulns, nil, false)
62+
iterVulns := func(yield func(*claircore.Vulnerability, error) bool) {
63+
for i := range vulns {
64+
if !yield(vulns[i], nil) {
65+
break
66+
}
67+
}
68+
}
69+
return s.updateVulnerabilities(ctx, updater, fp, iterVulns, nil)
5670
}
5771

5872
// DeltaUpdateVulnerabilities implements vulnstore.Updater.
@@ -68,10 +82,24 @@ func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string
6882
// - Associate new vulnerabilities with new updateOperation
6983
func (s *MatcherStore) DeltaUpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deletedVulns []string) (uuid.UUID, error) {
7084
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.DeltaUpdateVulnerabilities")
71-
return s.updateVulnerabilities(ctx, updater, fingerprint, vulns, deletedVulns, true)
85+
iterVulns := func(yield func(*claircore.Vulnerability, error) bool) {
86+
for i := range vulns {
87+
if !yield(vulns[i], nil) {
88+
break
89+
}
90+
}
91+
}
92+
delVulns := func(yield func(string, error) bool) {
93+
for _, s := range deletedVulns {
94+
if !yield(s, nil) {
95+
break
96+
}
97+
}
98+
}
99+
return s.updateVulnerabilities(ctx, updater, fingerprint, iterVulns, delVulns)
72100
}
73101

74-
func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deletedVulns []string, delta bool) (uuid.UUID, error) {
102+
func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulnIter datastore.VulnerabilityIter, delIter datastore.Iter[string]) (uuid.UUID, error) {
75103
const (
76104
// Create makes a new update operation and returns the reference and ID.
77105
create = `INSERT INTO update_operation (updater, fingerprint, kind) VALUES ($1, $2, 'vulnerability') RETURNING id, ref;`
@@ -139,6 +167,7 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string
139167
return uuid.Nil, fmt.Errorf("failed to create update_operation: %w", err)
140168
}
141169

170+
delta := delIter != nil
142171
updateVulnerabilitiesCounter.WithLabelValues("create", strconv.FormatBool(delta)).Add(1)
143172
updateVulnerabilitiesDuration.WithLabelValues("create", strconv.FormatBool(delta)).Observe(time.Since(start).Seconds())
144173

@@ -181,18 +210,20 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string
181210
}
182211

183212
if len(oldVulns) > 0 {
184-
for _, v := range vulns {
213+
vulnIter(func(v *claircore.Vulnerability, _ error) bool {
185214
// If we have an existing vuln in the new batch
186215
// delete it from the oldVulns map so it doesn't
187216
// get associated with the new update_operation.
188217
delete(oldVulns, v.Name)
189-
}
190-
for _, delName := range deletedVulns {
218+
return true
219+
})
220+
delIter(func(delName string, _ error) bool {
191221
// If we have an existing vuln that has been signaled
192222
// as deleted by the updater then delete it so it doesn't
193223
// get associated with the new update_operation.
194224
delete(oldVulns, delName)
195-
}
225+
return true
226+
})
196227
}
197228
start = time.Now()
198229
// Associate already existing vulnerabilities with new update_operation.
@@ -211,14 +242,20 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string
211242

212243
// batch insert vulnerabilities
213244
skipCt := 0
214-
245+
vulnCt := 0
215246
start = time.Now()
216247

217248
mBatcher := microbatch.NewInsert(tx, 2000, time.Minute)
218-
for _, vuln := range vulns {
249+
250+
vulnIter(func(vuln *claircore.Vulnerability, iterErr error) bool {
251+
if iterErr != nil {
252+
err = iterErr
253+
return false
254+
}
255+
vulnCt++
219256
if vuln.Package == nil || vuln.Package.Name == "" {
220257
skipCt++
221-
continue
258+
return true
222259
}
223260

224261
pkg := vuln.Package
@@ -233,7 +270,7 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string
233270
hashKind, hash := md5Vuln(vuln)
234271
vKind, vrLower, vrUpper := rangefmt(vuln.Range)
235272

236-
err := mBatcher.Queue(ctx, insert,
273+
err = mBatcher.Queue(ctx, insert,
237274
hashKind, hash,
238275
vuln.Name, vuln.Updater, vuln.Description, vuln.Issued, vuln.Links, vuln.Severity, vuln.NormalizedSeverity,
239276
pkg.Name, pkg.Version, pkg.Module, pkg.Arch, pkg.Kind,
@@ -242,12 +279,20 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string
242279
vuln.FixedInVersion, vuln.ArchOperation, vKind, vrLower, vrUpper,
243280
)
244281
if err != nil {
245-
return uuid.Nil, fmt.Errorf("failed to queue vulnerability: %w", err)
282+
err = fmt.Errorf("failed to queue vulnerability: %w", err)
283+
return false
246284
}
247285

248-
if err := mBatcher.Queue(ctx, assoc, hashKind, hash, uoID); err != nil {
249-
return uuid.Nil, fmt.Errorf("failed to queue association: %w", err)
286+
err = mBatcher.Queue(ctx, assoc, hashKind, hash, uoID)
287+
if err != nil {
288+
err = fmt.Errorf("failed to queue association: %w", err)
289+
return false
250290
}
291+
292+
return true
293+
})
294+
if err != nil {
295+
return uuid.Nil, fmt.Errorf("iterating on vulnerabilities: %w", err)
251296
}
252297
if err := mBatcher.Done(ctx); err != nil {
253298
return uuid.Nil, fmt.Errorf("failed to finish batch vulnerability insert: %w", err)
@@ -266,7 +311,7 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string
266311
zlog.Debug(ctx).
267312
Str("ref", ref.String()).
268313
Int("skipped", skipCt).
269-
Int("inserted", len(vulns)-skipCt).
314+
Int("inserted", vulnCt-skipCt).
270315
Msg("update_operation committed")
271316
return ref, nil
272317
}

datastore/updater.go

+6
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ import (
1010
"github.com/quay/claircore/libvuln/driver"
1111
)
1212

13+
// VulnerabilityIter is an [Iter] of vulnerabilities.
14+
type VulnerabilityIter Iter[*claircore.Vulnerability]
15+
1316
// Updater is an interface exporting the necessary methods
1417
// for updating a vulnerability database.
1518
type Updater interface {
@@ -19,6 +22,9 @@ type Updater interface {
1922
// vulnerabilities, and ensures vulnerabilities from previous updates are
2023
// not queried by clients.
2124
UpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error)
25+
// UpdateVulnerabilitiesIter performs the same operation as
26+
// UpdateVulnerabilities, but accepting an iterator function.
27+
UpdateVulnerabilitiesIter(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulnIter VulnerabilityIter) (uuid.UUID, error)
2228
// DeltaUpdateVulnerabilities creates a new UpdateOperation consisting of existing
2329
// vulnerabilities and new vulnerabilities. It also takes an array of deleted
2430
// vulnerability names which should no longer be available to query.

0 commit comments

Comments
 (0)