-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Connect reminder service to minder server to dispatch reminders #3630
Connect reminder service to minder server to dispatch reminders #3630
Conversation
c444dfd
to
b5a1289
Compare
internal/reminder/reminder.go
Outdated
// Commit the transaction i.e update reminder_last_sent | ||
// only if the messages were sent successfully | ||
if err = tx.Commit(); err != nil { | ||
logger.Error().Err(err).Msg("unable to commit transaction") | ||
errorSlice = append(errorSlice, err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UpdateReminderLastSentById
queries are part of a transaction and are only committed if the messages are published successfully. Still, the transaction could fail, which would result in reminders being sent, but the reminder_last_sent
field won't be updated (better than false positive i.e. reminder_last_sent
was updated but reminders weren't sent)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine. Did you consider to split sending the messages into a separate function or return earlier if len(messages) == 0
? This might be a personal preference but I find it easier to read code with fewer indentation levels.
Something like:
if len(messages) == 0 {
return errorSlice
}
// the rest of the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like the reminder_last_sent
field is mostly diagnostic, because we're actually ticking forward on the (in-memory) repository cursor, rather than using reminder_last_sent
to actually dictate when repos are reconsidered. I think this is correct, but that suggests to me that strict update correctness is less necessary.
(I'm not arguing against keeping this data, but it seems like the actual mechanism that prevents re-evaluation is when the reminder is received and a rule evaluation is complete, which could be minutes after the reminder is sent on a particularly bad day.)
// TODO: Collect Metrics | ||
// Potential metrics: | ||
// - Gauge: Number of reminders in the current batch | ||
// - UpDownCounter: Average reminders sent per batch | ||
// - Histogram: reminder_last_sent time distribution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we want to export the metrics? And some thoughts on adding these metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I wonder what metrics would be useful as a minder operator.
About the histogram, I think a histogram that shows me how out of date (how much before a cutoff) a repo was when it was selected would be useful.
I think the metrics would be interesting to fine-tune the algorithm, did you see some other value in the metrics?
Since reminder is a separate service, then I guess implementation-wise it should just have its own metrics server that could be scraped separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My $0.02 would be:
- Create a
MeterProvider
ala https://github.com/stacklok/minder/blob/main/internal/controlplane/server.go#L188. - Wire the metrics into prometheus. Since this has no external interface, you can simply set up an internal http.Server to serve the metrics -- we could put other debug information there later.
- If you call
otel.SetMeterProvider
in the server setup, you don't need to worry about plumbing directly from the code recording the metric to the server; OTel will handle this in the backend (with some globals, but that's fine in this case). That means that the metrics code can do the simple thing from the OTel docs:var timeShift metrics.Float64Histogram func init() { meter := otel.GetMeterProvider().Meter("reminder") // Or could be "" var err error timeShift, err = meter.Float64Counter("send_delay", api.WithDescription("Delay in revisit after eligible"), api.WithUnit("s")) if err != nil { panic("Couldn't register meter: %v", err) } } // At call site: timeShift.Record(ctx, send_delay) // No need for attributes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add monitoring in a sep PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Speaking about metrics, send_delay
would be defined as:
cutoff := time.Now().Add(-1 * r.cfg.RecurrenceConfig.MinElapsed)
for _, repo := range repos {
if t, ok := idToLastUpdate[repo.ID]; ok && t.Before(cutoff) {
eligibleRepos = append(eligibleRepos, repo)
// t is the last_update time for that repo
// send_delay ≈ time.Now().Sub(t) - r.cfg.RecurrenceConfig.MinElapsed
}
}
Now send_delay
time has nothing to do with reminder_last_sent
. What metrics do we want to record with reminder_last_sent
? reminder_last_sent_delay
isn't useful as the repo might not have been eligible in between so this doesn't represent the algorithm's performance. Struggling to find the value of reminder_last_sent
as a metric parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I was thinking of recording on each pass of collecting reminders would effectively be the following for each reminded row:
now() - reminder_last_sent
Which would hopefully give a nice even distribution between MinElapsed
and MinElapsed + Interval
in normal operation. Unfortunately, common values might be MinElapsed = 24h
and Interval = 5m
, which will lead to a histogram where everything lands in a very large bucket. So, for practicality, I'd probably report this as:
now() - reminder_last_sent - MinElapsed
Which should give a range 0 .. Interval
. If the algorithm starts to get behind or reminders "bunch", we'd see the tail of the histogram get bigger, and go beyond Interval
. Given the target of Interval = 5m
(hypothetical, but seems reasonable), we'd probably want to measure in seconds with a starting bucket size of 10s, roughly-doubling buckets (or a custom 1..5..10 bucketer) and going out to at least 20 minutes, which would mean about 9 buckets (0, 10, 20, 40, 80, ... 1280). I'm less concerned about the exact bucket sizing as being able to see whether most items are ending up within that MinElapsed + Interval
window.
I'd store this metric in UpdateReminderLastSentForRepositories
, so that the rest of the loop didn't need to care, and it was next to the handling of reminder_last_sent
.
b5a1289
to
d480686
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code looks quite good to me. I left replies to your comments, neither of them is blocking. It wasn't clear to me if you wanted to tackle the metrics as part of this PR, but I normally prefer smaller PRs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the current architecture re-uses the existing events configuration for Minder, since it's assuming that the repo.reminder.event
events are in a Watermill instance which Minder is configured to see. I think that's fine, but I didn't recall for sure where we'd landed on that earlier. This setup is 👍 for me from a simplicity PoV.
// Project is the project that the event is relevant to | ||
Project uuid.UUID `json:"project"` | ||
// RepositoryID is id of the repository to be reconciled | ||
RepositoryID int64 `json:"repository" validate:"gte=0"` | ||
// ProviderID is the provider of the repository | ||
ProviderID uuid.UUID `json:"provider"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My preference would be to order these Project > Provider > Repo, which is the ownership order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
// Project is the project that the event is relevant to | ||
Project uuid.UUID `json:"project"` | ||
// RepositoryID is id of the repository to be reconciled | ||
RepositoryID int64 `json:"repository" validate:"gte=0"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, this should use the id
field from the repositories
table, not the repo_id
from GitHub. That will help insulate this from GitHub-specific fields that are different in (for example) GitLab or BitBucket in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally, id
is the primary key. It looks like our current indexes are set up off repo_id
; sorry for not noticing this previously...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking further, it looks like we use the GitHub-matching repo_id
in NewRepoReconcilerMessage already, so that argues towards using it here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yupp, I used RepositoryID
to represent GH Id, as Id collided with internal primary key.
validate := validator.New() | ||
if err := validate.Struct(&evt); err != nil { | ||
return nil, err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the benefit of calling validate
and checking that the numeric repo_id
is non-negative? It seems like the caller of this method will still need to check whether or not repo_id
actually exists in the database, so this seems like it adds some extra code without really accomplishing the necessary validation; the necessary validation requires a database connection, and would presumably return the actual row if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, no such benefit, removed it.
// TODO: Collect Metrics | ||
// Potential metrics: | ||
// - Gauge: Number of reminders in the current batch | ||
// - UpDownCounter: Average reminders sent per batch | ||
// - Histogram: reminder_last_sent time distribution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My $0.02 would be:
- Create a
MeterProvider
ala https://github.com/stacklok/minder/blob/main/internal/controlplane/server.go#L188. - Wire the metrics into prometheus. Since this has no external interface, you can simply set up an internal http.Server to serve the metrics -- we could put other debug information there later.
- If you call
otel.SetMeterProvider
in the server setup, you don't need to worry about plumbing directly from the code recording the metric to the server; OTel will handle this in the backend (with some globals, but that's fine in this case). That means that the metrics code can do the simple thing from the OTel docs:var timeShift metrics.Float64Histogram func init() { meter := otel.GetMeterProvider().Meter("reminder") // Or could be "" var err error timeShift, err = meter.Float64Counter("send_delay", api.WithDescription("Delay in revisit after eligible"), api.WithUnit("s")) if err != nil { panic("Couldn't register meter: %v", err) } } // At call site: timeShift.Record(ctx, send_delay) // No need for attributes
internal/reminder/reminder.go
Outdated
return []error{err} | ||
} | ||
|
||
defer tx.Rollback() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need all of the following database reads and writes to go into a single transaction (but not the getRepositoryBatch
call above in the transaction, or the r.updateRepositoryCursor
in the outer getRepositoryBatch
call)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the txn logic.
internal/reminder/reminder.go
Outdated
Time("previously", repo.ReminderLastSent.Time). | ||
Msg("updating reminder_last_sent") | ||
|
||
err = qtx.UpdateReminderLastSentById(ctx, repo.ID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intentional that you update when the reminder is sent before you send the message?
... except that with transactions, this actually sort-of happens after the message was sent, too -- you may be taking out a lock on the repo row for the duration of the iteration. Transactions can make before/after behavior hard to reason about in some cases, so I'd be cautious about introducing them when they aren't needed / aren't producing specific guarantees.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed this.
internal/reminder/reminder.go
Outdated
logger.Error().Err(err).Str("repo", repo.ID.String()).Msg("unable to update reminder_last_sent") | ||
return []error{err} | ||
errorSlice = append(errorSlice, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general:
Either handle an error (by logging, recovering, etc) OR propagate an error, but don't do both.
Logging the error and also propagating it often means that you'll end up with errors being double- or even triple-logged, as different parts of the stack log the error but then pass it up to be logged in the next part of the stack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
internal/reminder/reminder.go
Outdated
if err != nil { | ||
errorSlice = append(errorSlice, fmt.Errorf("error publishing messages: %w", err)) | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than an if/else
here, generally prefer to use early-return, particularly since this seems like a terminal case.
More generally, I'm not convinced that there's a large benefit to batching the Publish messages, rather than sending each in the previous part of the loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
I'm not convinced that there's a large benefit to batching the Publish messages, rather than sending each in the previous part of the loop.
Bulk updates might be better than connecting to the db for every update, I went with bulk updates with slight changes in logic.
internal/reminder/reminder.go
Outdated
// Commit the transaction i.e update reminder_last_sent | ||
// only if the messages were sent successfully | ||
if err = tx.Commit(); err != nil { | ||
logger.Error().Err(err).Msg("unable to commit transaction") | ||
errorSlice = append(errorSlice, err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like the reminder_last_sent
field is mostly diagnostic, because we're actually ticking forward on the (in-memory) repository cursor, rather than using reminder_last_sent
to actually dictate when repos are reconsidered. I think this is correct, but that suggests to me that strict update correctness is less necessary.
(I'm not arguing against keeping this data, but it seems like the actual mechanism that prevents re-evaluation is when the reminder is received and a rule evaluation is complete, which could be minutes after the reminder is sent on a particularly bad day.)
func (rp *ReminderProcessor) reminderMessageHandler(msg *message.Message) error { | ||
evt, err := remindermessages.RepoReminderEventFromMessage(msg) | ||
if err != nil { | ||
return fmt.Errorf("error unmarshalling reminder event: %w", err) | ||
} | ||
|
||
log.Info().Msgf("Received reminder event: %v", evt) | ||
|
||
repoReconcileMsg, err := reconcilermessages.NewRepoReconcilerMessage(evt.ProviderID, evt.RepositoryID, evt.Project) | ||
if err != nil { | ||
return fmt.Errorf("error creating repo reconcile event: %w", err) | ||
} | ||
|
||
// This is a non-fatal error, so we'll just log it and continue with the next ones | ||
if err := rp.evt.Publish(events.TopicQueueReconcileRepoInit, repoReconcileMsg); err != nil { | ||
log.Printf("error publishing reconciler event: %v", err) | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like this is simply forwarding messages from one queue to another; why not simply sending the TopicQueueReconcileRepoInit
message directly from reminder.go
and skip this extra intermediary? (There might be a good reason, but we should encapsulate that with a comment if there is such a reason.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm open to removing this processor and directly sending events. But I think initially we discussed adding a new topic for reminder to have some distinction (which, now when I think cannot be internally distinguished, we don't have any form of tracing for reconciliations started by reminder).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In #4075 , I'm starting to look at switch from Watermill+SQL to CloudEvents + NATS, which has a somewhat more formalized envelope structure. I think we could probably have some sort of envelope property that distinguishes between the two, but put both messages onto the same topic -- the reader would be able to record which source the init
event came from, but would generally handle "new" and "remind" the same.
d480686
to
77cb1b2
Compare
Signed-off-by: Vyom-Yadav <[email protected]>
77cb1b2
to
f36d3de
Compare
err = r.store.UpdateReminderLastSentForRepositories(ctx, repoIds) | ||
if err != nil { | ||
return fmt.Errorf("reminders published but error updating last sent time: %w", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bulk updates for all repos is fine, IMO. We don't want to have a ms
difference. The only value of this metric is in minutes/hours to make sure reconciliation is happening at a regular interval and we can reconcile every repo in like 24 hours or so.
repoReconcileMessage, err := remindermessages.NewRepoReminderMessage( | ||
repo.ProviderID, repo.RepoID, repo.ProjectID, | ||
) | ||
if err != nil { | ||
return nil, fmt.Errorf("error creating reminder message: %w", err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We now stop if any JSON marshalling fails. I expect this to be deterministic, so if one is failing, others will fail too.
Sorry for the delay on this. Somehow, all weekends got occupied with one thing or the other :) |
No problem, I've been in-and-out the last few weeks anyway. I'll try to take a look at this later today or tomorrow. |
@evankanderson not found (404), where are you 😆 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for disappearing! I had a week of vacation and a week of conferences, and when I got done, I had no idea what happened before that time.
This is looking pretty good, I'm happy to add it in as-is for now, and figure out the next steps in a subsequent PR. I did add a couple comments, but I don't think they block merging this.
// RepositoryID is id of the repository to be reconciled | ||
RepositoryID int64 `json:"repository"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for leaving this long enough that the underlying ground has changed a little, but you may want to make this an "entity id", so that we can remind about other things than repositories (artifacts, PRs, etc).
It's fine to do this in a subsequent PR if you prefer, since we'll still getting Reminder booted up.
func NewRepoReminderMessage(providerId uuid.UUID, repoID int64, projectID uuid.UUID) (*message.Message, error) { | ||
evt := &RepoReminderEvent{ | ||
Project: projectID, | ||
ProviderID: providerId, | ||
RepositoryID: repoID, | ||
} | ||
|
||
evtStr, err := json.Marshal(evt) | ||
if err != nil { | ||
return nil, fmt.Errorf("error marshalling repo reminder event: %w", err) | ||
} | ||
|
||
msg := message.NewMessage(uuid.New().String(), evtStr) | ||
return msg, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like it would make more sense to have a (e RepoReminderEvent) Marshal
method, so you might use it as:
msg, err := RepoReminderEvent{projectID, providerID, repoID}.Marshal()
Since we don't need to validate any of the arguments to the message (it's just a data object), there's no need for a constructor.
} | ||
|
||
// RepoReminderEventFromMessage creates a new repo reminder event from a message | ||
func RepoReminderEventFromMessage(msg *message.Message) (*RepoReminderEvent, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is okay as a constructor, though it only saves about 1 line of code elsewhere, because the caller will need to handle the error anyway.
// TODO: Collect Metrics | ||
// Potential metrics: | ||
// - Gauge: Number of reminders in the current batch | ||
// - UpDownCounter: Average reminders sent per batch | ||
// - Histogram: reminder_last_sent time distribution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I was thinking of recording on each pass of collecting reminders would effectively be the following for each reminded row:
now() - reminder_last_sent
Which would hopefully give a nice even distribution between MinElapsed
and MinElapsed + Interval
in normal operation. Unfortunately, common values might be MinElapsed = 24h
and Interval = 5m
, which will lead to a histogram where everything lands in a very large bucket. So, for practicality, I'd probably report this as:
now() - reminder_last_sent - MinElapsed
Which should give a range 0 .. Interval
. If the algorithm starts to get behind or reminders "bunch", we'd see the tail of the histogram get bigger, and go beyond Interval
. Given the target of Interval = 5m
(hypothetical, but seems reasonable), we'd probably want to measure in seconds with a starting bucket size of 10s, roughly-doubling buckets (or a custom 1..5..10 bucketer) and going out to at least 20 minutes, which would mean about 9 buckets (0, 10, 20, 40, 80, ... 1280). I'm less concerned about the exact bucket sizing as being able to see whether most items are ending up within that MinElapsed + Interval
window.
I'd store this metric in UpdateReminderLastSentForRepositories
, so that the rest of the loop didn't need to care, and it was next to the handling of reminder_last_sent
.
func (rp *ReminderProcessor) reminderMessageHandler(msg *message.Message) error { | ||
evt, err := remindermessages.RepoReminderEventFromMessage(msg) | ||
if err != nil { | ||
return fmt.Errorf("error unmarshalling reminder event: %w", err) | ||
} | ||
|
||
log.Info().Msgf("Received reminder event: %v", evt) | ||
|
||
repoReconcileMsg, err := reconcilermessages.NewRepoReconcilerMessage(evt.ProviderID, evt.RepositoryID, evt.Project) | ||
if err != nil { | ||
return fmt.Errorf("error creating repo reconcile event: %w", err) | ||
} | ||
|
||
// This is a non-fatal error, so we'll just log it and continue with the next ones | ||
if err := rp.evt.Publish(events.TopicQueueReconcileRepoInit, repoReconcileMsg); err != nil { | ||
log.Printf("error publishing reconciler event: %v", err) | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In #4075 , I'm starting to look at switch from Watermill+SQL to CloudEvents + NATS, which has a somewhat more formalized envelope structure. I think we could probably have some sort of envelope property that distinguishes between the two, but put both messages onto the same topic -- the reader would be able to record which source the init
event came from, but would generally handle "new" and "remind" the same.
Summary
Provide a brief overview of the changes and the issue being addressed.
Explain the rationale and any background necessary for understanding the changes.
List dependencies required by this change, if any.
Issue #2262 (Task - I)
Adds logic for sending reminders to the minder server. Minder server processes these reminders to trigger a
TopicQueueReconcileRepoInit
event to reconcile a repository.Change Type
Mark the type of change your PR introduces:
Testing
Outline how the changes were tested, including steps to reproduce and any relevant configurations.
Attach screenshots if helpful.
Tested locally (no unit tests)
Review Checklist:
cc @evankanderson