Skip to content

Commit

Permalink
feat: process NotificationStatusAttemptingFallback
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Feb 26, 2025
1 parent b7026b7 commit a5650b7
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 4 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ require (
sigs.k8s.io/yaml v1.4.0
)

replace github.com/flanksource/duty => ../duty
// replace github.com/flanksource/duty => ../duty

// replace github.com/flanksource/gomplate/v3 => ../gomplat3

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ github.com/flanksource/artifacts v1.0.14 h1:Vv70bccsae0MwGaf/uSPp34J5V1/PyKfct9z
github.com/flanksource/artifacts v1.0.14/go.mod h1:qHVCnQu5k50aWNJ5UhpcAKEl7pAzqUrFFKGSm147G70=
github.com/flanksource/commons v1.36.1 h1:SDppXOYO6vk06d12SPOYVZJX+8gV72FmK1l9ar5fkjc=
github.com/flanksource/commons v1.36.1/go.mod h1:nsYCn6JOhENXNLR5pz4zNco70DQV+bGObQ8NbOrUeuQ=
github.com/flanksource/duty v1.0.862 h1:VsqkaEKjxezMordMu3YZNd64sWH0/GWvE4uD98ScPWY=
github.com/flanksource/duty v1.0.862/go.mod h1:5NmEw4b/vz08vXNR4WPuHWNtD6YB2pGu2RrX1HrDT8E=
github.com/flanksource/gomplate/v3 v3.24.55 h1:BW+KeMcggCkNawb4VTbY+Zn3s9eYIhwqb5/myiyJRb8=
github.com/flanksource/gomplate/v3 v3.24.55/go.mod h1:hGEObNtnOQs8rNUX8sM8aJTAhnt4ehjyOw1MvDhl6AU=
github.com/flanksource/is-healthy v1.0.62 h1:wa4Eq3YR1+Ku3UhKlVpos0CqieGeWWVF5gZeOsmDAIg=
Expand Down
4 changes: 4 additions & 0 deletions jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func Start(ctx context.Context) {
shutdown.ShutdownAndExit(1, fmt.Sprintf("failed to schedule job ProcessFallbackCheckNotificationsJob: %v", err))
}

if err := notification.ProcessFallbackNotificationsJob(ctx).AddToScheduler(FuncScheduler); err != nil {
shutdown.ShutdownAndExit(1, fmt.Sprintf("failed to schedule job ProcessFallbackNotificationsJob: %v", err))
}

if err := notification.ProcessPendingNotificationsJob(ctx).AddToScheduler(FuncScheduler); err != nil {
logger.Errorf("failed to schedule job: %v", err)
}
Expand Down
28 changes: 27 additions & 1 deletion notification/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,31 @@ func sendNotifications(ctx context.Context, events models.Events) models.Events
return failedEvents
}

func sendFallbackNotification(ctx context.Context, sendHistory models.NotificationSendHistory) error {
notif, err := GetNotification(ctx, sendHistory.NotificationID.String())
if err != nil {
return fmt.Errorf("failed to get notification[%s]: %w", sendHistory.NotificationID, err)
}

var payload NotificationEventPayload
payload.FromMap(sendHistory.Payload)

payload.PersonID = notif.FallbackPersonID
payload.TeamID = notif.FallbackTeamID
payload.PlaybookID = notif.FallbackPlaybookID
payload.CustomService = notif.FallbackCustomNotification

if err := sendPendingNotification(ctx, sendHistory, payload); err != nil {
return fmt.Errorf("failed to send notification: %w", err)
} else if dberr := ctx.DB().Model(&models.NotificationSendHistory{}).Where("id = ?", sendHistory.ID).UpdateColumns(map[string]any{
"status": models.NotificationStatusSent,
}).Error; dberr != nil {
return fmt.Errorf("failed to save notification status as sent: %w", dberr)
}

return nil
}

func sendPendingNotification(ctx context.Context, history models.NotificationSendHistory, payload NotificationEventPayload) error {
notificationContext := NewContext(ctx, payload.NotificationID).WithHistory(history)

Expand Down Expand Up @@ -396,12 +421,13 @@ func _sendNotification(ctx *Context, noWait bool, payload NotificationEventPaylo
return fmt.Errorf("failed to get notification: %w", err)
}

ctx.log.Payload = payload.AsMap()

if !noWait && nn.WaitFor != nil {
// delayed notifications are saved to history with a pending status
// and are later consumed by a job.
ctx.log.NotBefore = lo.ToPtr(ctx.log.CreatedAt.Add(*nn.WaitFor))
ctx.log.Status = models.NotificationStatusPending
ctx.log.Payload = payload.AsMap()
} else {
if payload.PlaybookID != nil {
if err := triggerPlaybookRun(ctx, celEnv, *payload.PlaybookID); err != nil {
Expand Down
83 changes: 82 additions & 1 deletion notification/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func ProcessFallbackCheckNotificationsJob(ctx context.Context) *job.Job {
RunNow: true,
Context: ctx,
Singleton: false,
Schedule: "@every 15s",
Schedule: "@every 30s",
Fn: func(ctx job.JobRuntime) error {
var errCount int
for {
Expand Down Expand Up @@ -383,6 +383,7 @@ func ProcessFallbackCheckNotifications(parentCtx context.Context) (bool, error)
if err := ctx.DB().Model(&models.NotificationSendHistory{}).Where("id IN ?", shouldFallback).UpdateColumns(map[string]any{
"status": models.NotificationStatusAttemptingFallback,
"is_fallback": true,
"not_before": gorm.Expr("CASE WHEN not_before IS NULL THEN NOW() + INTERVAL '1 MINUTE' ELSE not_before + INTERVAL '1 minute' END"),
}).Error; err != nil {
return fmt.Errorf("failed to update notification status to attempting fallback: %w", err)
}
Expand All @@ -403,6 +404,86 @@ func ProcessFallbackCheckNotifications(parentCtx context.Context) (bool, error)
return noMorePending, err
}

func ProcessFallbackNotificationsJob(ctx context.Context) *job.Job {
return &job.Job{
Name: "ProcessFallbackNotifications",
Retention: job.RetentionFailed,
JobHistory: true,
RunNow: true,
Context: ctx,
Singleton: false,
Schedule: "@every 15s",
Fn: func(ctx job.JobRuntime) error {
var iter int
for {
iter++
if iter > 3 {
break
}

done, err := ProcessFallbackNotifications(ctx.Context)
if err != nil {
ctx.History.AddErrorf("failed to process notifications in status=%s : %v", models.NotificationStatusAttemptingFallback, err)
time.Sleep(2 * time.Second) // prevent spinning on db errors
continue
}

ctx.History.IncrSuccess()

if done {
break
}
}

return nil
},
}
}

func ProcessFallbackNotifications(parentCtx context.Context) (bool, error) {
var noMorePending bool

err := parentCtx.DB().Transaction(func(tx *gorm.DB) error {
ctx := parentCtx.WithDB(tx, parentCtx.Pool())

var pending []models.NotificationSendHistory
if err := ctx.DB().Clauses(clause.Locking{Strength: clause.LockingStrengthUpdate, Options: clause.LockingOptionsSkipLocked}).
Where("status = ?", models.NotificationStatusAttemptingFallback).
Where("not_before <= NOW()").
Limit(1). // one at a time; as one notification failure shouldn't affect a previous successful one
Find(&pending).Error; err != nil {
return fmt.Errorf("failed to get notifications to send to fallback: %w", err)
}

if len(pending) == 0 {
noMorePending = true
return nil
}

currentHistory := pending[0]
ctx.Logger.V(6).Infof("attempting fallback notification (%s/%s) for resource %s",
currentHistory.ID,
currentHistory.Status,
currentHistory.ResourceID,
)

if err := sendFallbackNotification(ctx, currentHistory); err != nil {
if dberr := ctx.DB().Debug().Model(&models.NotificationSendHistory{}).Where("id = ?", currentHistory.ID).UpdateColumns(map[string]any{
"status": gorm.Expr("CASE WHEN retries >= ? THEN ? ELSE ? END", ctx.Properties().Int("notification.max-retries", 4)-1, models.NotificationStatusError, models.NotificationStatusCheckingFallback),
"error": err.Error(), // TODO: need to concat with previous error
"retries": gorm.Expr("retries + 1"),
}).Error; dberr != nil {
return ctx.Oops().Join(dberr, err)
}
}

// we return nil or else the transaction will be rolled back and there'll be no trace of a failed attempt.
return nil
})

return noMorePending, err
}

// If the resource is still unhealthy, it returns false
func shouldSkipNotificationDueToHealth(ctx context.Context, notif NotificationWithSpec, currentHistory models.NotificationSendHistory) (bool, error) {
var payload NotificationEventPayload
Expand Down
19 changes: 18 additions & 1 deletion notification/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func GetNotificationIDsForEvent(ctx context.Context, eventName string) ([]string
// A wrapper around notification that also contains the custom notifications.
type NotificationWithSpec struct {
models.Notification
CustomNotifications []api.NotificationConfig
CustomNotifications []api.NotificationConfig
FallbackCustomNotification *api.NotificationConfig
}

func GetNotification(ctx context.Context, id string) (*NotificationWithSpec, error) {
Expand Down Expand Up @@ -69,6 +70,22 @@ func GetNotification(ctx context.Context, id string) (*NotificationWithSpec, err
CustomNotifications: customNotifications,
}

if len(n.FallbackCustomServices) > 0 {
b, err := json.Marshal(n.FallbackCustomServices)
if err != nil {
return nil, err
}

var customNotifications []api.NotificationConfig
if err := json.Unmarshal(b, &customNotifications); err != nil {
return nil, err
}

if len(customNotifications) > 0 {
data.FallbackCustomNotification = &customNotifications[0]
}
}

notificationByIDCache.Set(id, &data, cache.DefaultExpiration)

return &data, nil
Expand Down

0 comments on commit a5650b7

Please sign in to comment.