Skip to content

Commit 0831b93

Browse files
committed
db: Add stream update support for enrichments
Signed-off-by: J. Victor Martins <[email protected]>
1 parent e5d5c8d commit 0831b93

File tree

3 files changed

+54
-8
lines changed

3 files changed

+54
-8
lines changed

datastore/enrichment.go

+6
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,19 @@ import (
88
"github.com/quay/claircore/libvuln/driver"
99
)
1010

11+
// EnrichmentIter is an [Iter] of enrichment records.
12+
type EnrichmentIter Iter[*driver.EnrichmentRecord]
13+
1114
// EnrichmentUpdater is an interface exporting the necessary methods
1215
// for storing and querying Enrichments.
1316
type EnrichmentUpdater interface {
1417
// UpdateEnrichments creates a new EnrichmentUpdateOperation, inserts the provided
1518
// EnrichmentRecord(s), and ensures enrichments from previous updates are not
1619
// queries by clients.
1720
UpdateEnrichments(ctx context.Context, kind string, fingerprint driver.Fingerprint, enrichments []driver.EnrichmentRecord) (uuid.UUID, error)
21+
// UpdateEnrichmentsIter performs the same operation as UpdateEnrichments, but
22+
// accepting an iterator function.
23+
UpdateEnrichmentsIter(ctx context.Context, kind string, fingerprint driver.Fingerprint, enIter EnrichmentIter) (uuid.UUID, error)
1824
}
1925

2026
// Enrichment is an interface for querying enrichments from the store.

datastore/postgres/enrichment.go

+38-8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/prometheus/client_golang/prometheus/promauto"
1818
"github.com/quay/zlog"
1919

20+
"github.com/quay/claircore/datastore"
2021
"github.com/quay/claircore/libvuln/driver"
2122
"github.com/quay/claircore/pkg/microbatch"
2223
)
@@ -60,10 +61,27 @@ var (
6061
)
6162
)
6263

64+
func (s *MatcherStore) UpdateEnrichmentsIter(ctx context.Context, updater string, fp driver.Fingerprint, it datastore.EnrichmentIter) (uuid.UUID, error) {
65+
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateEnrichmentsIter")
66+
return s.updateEnrichments(ctx, updater, fp, it)
67+
}
68+
6369
// UpdateEnrichments creates a new UpdateOperation, inserts the provided
6470
// EnrichmentRecord(s), and ensures enrichments from previous updates are not
6571
// queried by clients.
66-
func (s *MatcherStore) UpdateEnrichments(ctx context.Context, name string, fp driver.Fingerprint, es []driver.EnrichmentRecord) (uuid.UUID, error) {
72+
func (s *MatcherStore) UpdateEnrichments(ctx context.Context, updater string, fp driver.Fingerprint, es []driver.EnrichmentRecord) (uuid.UUID, error) {
73+
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateEnrichments")
74+
enIter := func(yield func(record *driver.EnrichmentRecord, err error) bool) {
75+
for i := range es {
76+
if !yield(&es[i], nil) {
77+
break
78+
}
79+
}
80+
}
81+
return s.updateEnrichments(ctx, updater, fp, enIter)
82+
}
83+
84+
func (s *MatcherStore) updateEnrichments(ctx context.Context, name string, fp driver.Fingerprint, it datastore.EnrichmentIter) (uuid.UUID, error) {
6785
const (
6886
create = `
6987
INSERT
@@ -134,17 +152,29 @@ DO
134152

135153
batch := microbatch.NewInsert(tx, 2000, time.Minute)
136154
start = time.Now()
137-
for i := range es {
138-
hashKind, hash := hashEnrichment(&es[i])
139-
err := batch.Queue(ctx, insert,
140-
hashKind, hash, name, es[i].Tags, es[i].Enrichment,
155+
enCt := 0
156+
it(func(en *driver.EnrichmentRecord, iterErr error) bool {
157+
if iterErr != nil {
158+
err = iterErr
159+
return false
160+
}
161+
enCt++
162+
hashKind, hash := hashEnrichment(en)
163+
err = batch.Queue(ctx, insert,
164+
hashKind, hash, name, en.Tags, en.Enrichment,
141165
)
142166
if err != nil {
143-
return uuid.Nil, fmt.Errorf("failed to queue enrichment: %w", err)
167+
err = fmt.Errorf("failed to queue enrichment: %w", err)
168+
return false
144169
}
145170
if err := batch.Queue(ctx, assoc, hashKind, hash, name, id); err != nil {
146-
return uuid.Nil, fmt.Errorf("failed to queue association: %w", err)
171+
err = fmt.Errorf("failed to queue association: %w", err)
172+
return false
147173
}
174+
return true
175+
})
176+
if err != nil {
177+
return uuid.Nil, fmt.Errorf("iterating on enrichments: %w", err)
148178
}
149179
if err := batch.Done(ctx); err != nil {
150180
return uuid.Nil, fmt.Errorf("failed to finish batch enrichment insert: %w", err)
@@ -160,7 +190,7 @@ DO
160190
}
161191
zlog.Debug(ctx).
162192
Stringer("ref", ref).
163-
Int("inserted", len(es)).
193+
Int("inserted", enCt).
164194
Msg("update_operation committed")
165195
return ref, nil
166196
}

libvuln/jsonblob/jsonblob.go

+10
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,16 @@ func (s *Store) DeltaUpdateVulnerabilities(ctx context.Context, updater string,
455455
return uuid.Nil, nil
456456
}
457457

458+
// UpdateEnrichmentsIter is unimplemented.
459+
func (s *Store) UpdateEnrichmentsIter(_ context.Context, _ string, _ driver.Fingerprint, _ datastore.EnrichmentIter) (uuid.UUID, error) {
460+
return uuid.Nil, errors.ErrUnsupported
461+
}
462+
463+
// UpdateVulnerabilitiesIter is unimplemented.
464+
func (s *Store) UpdateVulnerabilitiesIter(_ context.Context, _ string, _ driver.Fingerprint, _ datastore.VulnerabilityIter) (uuid.UUID, error) {
465+
return uuid.Nil, errors.ErrUnsupported
466+
}
467+
458468
var bufPool sync.Pool
459469

460470
func getBuf() []byte {

0 commit comments

Comments
 (0)