Skip to content

Commit

Permalink
ack catalog imported events (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
autodidaddict authored Mar 14, 2024
1 parent 264f8aa commit 76db652
Showing 1 changed file with 21 additions and 0 deletions.
21 changes: 21 additions & 0 deletions internal/globalservice/proj_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,30 @@ func (srv *GlobalService) updateAccountProjection(msg jetstream.Msg) {
initAccount(msg, from, kv)
case models.ContextBoundEventType:
recordContextBinding(msg, from, kv)
case models.CatalogImportedEventType:
recordCatalogImported(msg, from, kv)
}
}

func recordCatalogImported(msg jetstream.Msg, account string, kv jetstream.KeyValue) {
existingAccount, err := loadAccount(kv, account)
if err != nil {
slog.Error("Failed to load account corresponding to catalog import event", slog.Any("error", err))
_ = msg.Nak()
return
}
if existingAccount == nil {
slog.Error("A non-existent account attempted to record a catalog imported event")
_ = msg.Nak()
return
}

// At the moment, catalog imported does not affect the value of the projection
// NOOP

_ = msg.Ack()
}

func recordContextBinding(msg jetstream.Msg, account string, kv jetstream.KeyValue) {
existingAccount, err := loadAccount(kv, account)
if err != nil {
Expand Down

0 comments on commit 76db652

Please sign in to comment.