From a5650b77d29e928cf9c52e91f5c8f4197654498a Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Mon, 24 Feb 2025 18:09:28 +0545 Subject: [PATCH] feat: process NotificationStatusAttemptingFallback --- go.mod | 2 +- go.sum | 2 + jobs/jobs.go | 4 ++ notification/events.go | 28 +++++++++++- notification/job.go | 83 +++++++++++++++++++++++++++++++++++- notification/notification.go | 19 ++++++++- 6 files changed, 134 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 6035cc7f6..453ead1fd 100644 --- a/go.mod +++ b/go.mod @@ -368,7 +368,7 @@ require ( sigs.k8s.io/yaml v1.4.0 ) -replace github.com/flanksource/duty => ../duty +// replace github.com/flanksource/duty => ../duty // replace github.com/flanksource/gomplate/v3 => ../gomplat3 diff --git a/go.sum b/go.sum index 9c6db5a54..d68cff9a9 100644 --- a/go.sum +++ b/go.sum @@ -287,6 +287,8 @@ github.com/flanksource/artifacts v1.0.14 h1:Vv70bccsae0MwGaf/uSPp34J5V1/PyKfct9z github.com/flanksource/artifacts v1.0.14/go.mod h1:qHVCnQu5k50aWNJ5UhpcAKEl7pAzqUrFFKGSm147G70= github.com/flanksource/commons v1.36.1 h1:SDppXOYO6vk06d12SPOYVZJX+8gV72FmK1l9ar5fkjc= github.com/flanksource/commons v1.36.1/go.mod h1:nsYCn6JOhENXNLR5pz4zNco70DQV+bGObQ8NbOrUeuQ= +github.com/flanksource/duty v1.0.862 h1:VsqkaEKjxezMordMu3YZNd64sWH0/GWvE4uD98ScPWY= +github.com/flanksource/duty v1.0.862/go.mod h1:5NmEw4b/vz08vXNR4WPuHWNtD6YB2pGu2RrX1HrDT8E= github.com/flanksource/gomplate/v3 v3.24.55 h1:BW+KeMcggCkNawb4VTbY+Zn3s9eYIhwqb5/myiyJRb8= github.com/flanksource/gomplate/v3 v3.24.55/go.mod h1:hGEObNtnOQs8rNUX8sM8aJTAhnt4ehjyOw1MvDhl6AU= github.com/flanksource/is-healthy v1.0.62 h1:wa4Eq3YR1+Ku3UhKlVpos0CqieGeWWVF5gZeOsmDAIg= diff --git a/jobs/jobs.go b/jobs/jobs.go index da2f52d9e..04e6d6f80 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -72,6 +72,10 @@ func Start(ctx context.Context) { shutdown.ShutdownAndExit(1, fmt.Sprintf("failed to schedule job ProcessFallbackCheckNotificationsJob: %v", err)) } + if err := notification.ProcessFallbackNotificationsJob(ctx).AddToScheduler(FuncScheduler); err != nil { + shutdown.ShutdownAndExit(1, fmt.Sprintf("failed to schedule job ProcessFallbackNotificationsJob: %v", err)) + } + if err := notification.ProcessPendingNotificationsJob(ctx).AddToScheduler(FuncScheduler); err != nil { logger.Errorf("failed to schedule job: %v", err) } diff --git a/notification/events.go b/notification/events.go index 9b0d948da..2bdd11e07 100644 --- a/notification/events.go +++ b/notification/events.go @@ -268,6 +268,31 @@ func sendNotifications(ctx context.Context, events models.Events) models.Events return failedEvents } +func sendFallbackNotification(ctx context.Context, sendHistory models.NotificationSendHistory) error { + notif, err := GetNotification(ctx, sendHistory.NotificationID.String()) + if err != nil { + return fmt.Errorf("failed to get notification[%s]: %w", sendHistory.NotificationID, err) + } + + var payload NotificationEventPayload + payload.FromMap(sendHistory.Payload) + + payload.PersonID = notif.FallbackPersonID + payload.TeamID = notif.FallbackTeamID + payload.PlaybookID = notif.FallbackPlaybookID + payload.CustomService = notif.FallbackCustomNotification + + if err := sendPendingNotification(ctx, sendHistory, payload); err != nil { + return fmt.Errorf("failed to send notification: %w", err) + } else if dberr := ctx.DB().Model(&models.NotificationSendHistory{}).Where("id = ?", sendHistory.ID).UpdateColumns(map[string]any{ + "status": models.NotificationStatusSent, + }).Error; dberr != nil { + return fmt.Errorf("failed to save notification status as sent: %w", dberr) + } + + return nil +} + func sendPendingNotification(ctx context.Context, history models.NotificationSendHistory, payload NotificationEventPayload) error { notificationContext := NewContext(ctx, payload.NotificationID).WithHistory(history) @@ -396,12 +421,13 @@ func _sendNotification(ctx *Context, noWait bool, payload NotificationEventPaylo return fmt.Errorf("failed to get notification: %w", err) } + ctx.log.Payload = payload.AsMap() + if !noWait && nn.WaitFor != nil { // delayed notifications are saved to history with a pending status // and are later consumed by a job. ctx.log.NotBefore = lo.ToPtr(ctx.log.CreatedAt.Add(*nn.WaitFor)) ctx.log.Status = models.NotificationStatusPending - ctx.log.Payload = payload.AsMap() } else { if payload.PlaybookID != nil { if err := triggerPlaybookRun(ctx, celEnv, *payload.PlaybookID); err != nil { diff --git a/notification/job.go b/notification/job.go index d67e15949..a3837ab14 100644 --- a/notification/job.go +++ b/notification/job.go @@ -311,7 +311,7 @@ func ProcessFallbackCheckNotificationsJob(ctx context.Context) *job.Job { RunNow: true, Context: ctx, Singleton: false, - Schedule: "@every 15s", + Schedule: "@every 30s", Fn: func(ctx job.JobRuntime) error { var errCount int for { @@ -383,6 +383,7 @@ func ProcessFallbackCheckNotifications(parentCtx context.Context) (bool, error) if err := ctx.DB().Model(&models.NotificationSendHistory{}).Where("id IN ?", shouldFallback).UpdateColumns(map[string]any{ "status": models.NotificationStatusAttemptingFallback, "is_fallback": true, + "not_before": gorm.Expr("CASE WHEN not_before IS NULL THEN NOW() + INTERVAL '1 MINUTE' ELSE not_before + INTERVAL '1 minute' END"), }).Error; err != nil { return fmt.Errorf("failed to update notification status to attempting fallback: %w", err) } @@ -403,6 +404,86 @@ func ProcessFallbackCheckNotifications(parentCtx context.Context) (bool, error) return noMorePending, err } +func ProcessFallbackNotificationsJob(ctx context.Context) *job.Job { + return &job.Job{ + Name: "ProcessFallbackNotifications", + Retention: job.RetentionFailed, + JobHistory: true, + RunNow: true, + Context: ctx, + Singleton: false, + Schedule: "@every 15s", + Fn: func(ctx job.JobRuntime) error { + var iter int + for { + iter++ + if iter > 3 { + break + } + + done, err := ProcessFallbackNotifications(ctx.Context) + if err != nil { + ctx.History.AddErrorf("failed to process notifications in status=%s : %v", models.NotificationStatusAttemptingFallback, err) + time.Sleep(2 * time.Second) // prevent spinning on db errors + continue + } + + ctx.History.IncrSuccess() + + if done { + break + } + } + + return nil + }, + } +} + +func ProcessFallbackNotifications(parentCtx context.Context) (bool, error) { + var noMorePending bool + + err := parentCtx.DB().Transaction(func(tx *gorm.DB) error { + ctx := parentCtx.WithDB(tx, parentCtx.Pool()) + + var pending []models.NotificationSendHistory + if err := ctx.DB().Clauses(clause.Locking{Strength: clause.LockingStrengthUpdate, Options: clause.LockingOptionsSkipLocked}). + Where("status = ?", models.NotificationStatusAttemptingFallback). + Where("not_before <= NOW()"). + Limit(1). // one at a time; as one notification failure shouldn't affect a previous successful one + Find(&pending).Error; err != nil { + return fmt.Errorf("failed to get notifications to send to fallback: %w", err) + } + + if len(pending) == 0 { + noMorePending = true + return nil + } + + currentHistory := pending[0] + ctx.Logger.V(6).Infof("attempting fallback notification (%s/%s) for resource %s", + currentHistory.ID, + currentHistory.Status, + currentHistory.ResourceID, + ) + + if err := sendFallbackNotification(ctx, currentHistory); err != nil { + if dberr := ctx.DB().Debug().Model(&models.NotificationSendHistory{}).Where("id = ?", currentHistory.ID).UpdateColumns(map[string]any{ + "status": gorm.Expr("CASE WHEN retries >= ? THEN ? ELSE ? END", ctx.Properties().Int("notification.max-retries", 4)-1, models.NotificationStatusError, models.NotificationStatusCheckingFallback), + "error": err.Error(), // TODO: need to concat with previous error + "retries": gorm.Expr("retries + 1"), + }).Error; dberr != nil { + return ctx.Oops().Join(dberr, err) + } + } + + // we return nil or else the transaction will be rolled back and there'll be no trace of a failed attempt. + return nil + }) + + return noMorePending, err +} + // If the resource is still unhealthy, it returns false func shouldSkipNotificationDueToHealth(ctx context.Context, notif NotificationWithSpec, currentHistory models.NotificationSendHistory) (bool, error) { var payload NotificationEventPayload diff --git a/notification/notification.go b/notification/notification.go index bbb4306d8..f6b8ff0fc 100644 --- a/notification/notification.go +++ b/notification/notification.go @@ -41,7 +41,8 @@ func GetNotificationIDsForEvent(ctx context.Context, eventName string) ([]string // A wrapper around notification that also contains the custom notifications. type NotificationWithSpec struct { models.Notification - CustomNotifications []api.NotificationConfig + CustomNotifications []api.NotificationConfig + FallbackCustomNotification *api.NotificationConfig } func GetNotification(ctx context.Context, id string) (*NotificationWithSpec, error) { @@ -69,6 +70,22 @@ func GetNotification(ctx context.Context, id string) (*NotificationWithSpec, err CustomNotifications: customNotifications, } + if len(n.FallbackCustomServices) > 0 { + b, err := json.Marshal(n.FallbackCustomServices) + if err != nil { + return nil, err + } + + var customNotifications []api.NotificationConfig + if err := json.Unmarshal(b, &customNotifications); err != nil { + return nil, err + } + + if len(customNotifications) > 0 { + data.FallbackCustomNotification = &customNotifications[0] + } + } + notificationByIDCache.Set(id, &data, cache.DefaultExpiration) return &data, nil