diff --git a/api/v1/notification_types.go b/api/v1/notification_types.go index f9a5695ba..b13619441 100644 --- a/api/v1/notification_types.go +++ b/api/v1/notification_types.go @@ -39,6 +39,13 @@ func (t *NotificationRecipientSpec) Empty() bool { return t.Person == "" && t.Team == "" && t.Email == "" && t.Connection == "" && t.URL == "" && t.Playbook == nil } +type NotificationFallback struct { + NotificationRecipientSpec `json:",inline" yaml:",inline"` + + // wait this long before considering a send a failure + Delay string `json:"delay,omitempty" yaml:"delay,omitempty"` +} + // +kubebuilder:object:generate=true type NotificationSpec struct { // List of events that can trigger this notification @@ -56,13 +63,12 @@ type NotificationSpec struct { // RepeatInterval is the waiting time to resend a notification after it has been succefully sent. RepeatInterval string `json:"repeatInterval,omitempty" yaml:"repeatInterval,omitempty"` - // RepeatGroup allows notifications to be grouped by certain set of keys and only send - // one per group within the specified repeat interval. - // RepeatGroup []string `json:"repeatGroup,omitempty" yaml:"repeatGroup,omitempty"` - // Specify the recipient To NotificationRecipientSpec `json:"to" yaml:"to"` + // In case of failure, send the notification to this recipient + Fallback *NotificationFallback `json:"fallback,omitempty" yaml:"fallback,omitempty"` + // WaitFor defines a duration to delay sending a health-based notification. // After this period, the health status is reassessed to confirm it hasn't // changed, helping prevent false alarms from transient issues. diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index c63dd7cee..d544a6c0f 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -1448,6 +1448,22 @@ func (in *NotificationAction) DeepCopy() *NotificationAction { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NotificationFallback) DeepCopyInto(out *NotificationFallback) { + *out = *in + in.NotificationRecipientSpec.DeepCopyInto(&out.NotificationRecipientSpec) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NotificationFallback. +func (in *NotificationFallback) DeepCopy() *NotificationFallback { + if in == nil { + return nil + } + out := new(NotificationFallback) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *NotificationList) DeepCopyInto(out *NotificationList) { *out = *in @@ -1627,6 +1643,11 @@ func (in *NotificationSpec) DeepCopyInto(out *NotificationSpec) { copy(*out, *in) } in.To.DeepCopyInto(&out.To) + if in.Fallback != nil { + in, out := &in.Fallback, &out.Fallback + *out = new(NotificationFallback) + (*in).DeepCopyInto(*out) + } if in.WaitFor != nil { in, out := &in.WaitFor, &out.WaitFor *out = new(string) diff --git a/config/crds/mission-control.flanksource.com_notifications.yaml b/config/crds/mission-control.flanksource.com_notifications.yaml index 58f7ff493..c5d84893e 100644 --- a/config/crds/mission-control.flanksource.com_notifications.yaml +++ b/config/crds/mission-control.flanksource.com_notifications.yaml @@ -43,6 +43,41 @@ spec: items: type: string type: array + fallback: + description: In case of failure, send the notification to this recipient + properties: + connection: + description: |- + Specify connection string for an external service. + Should be in the format of connection:///name + or the id of the connection. + type: string + delay: + description: wait this long before considering a send a failure + type: string + email: + description: Email of the recipient + type: string + person: + description: ID or email of the person + type: string + playbook: + description: |- + Name or / of the playbook to run. + When a playbook is set as the recipient, a run is triggered. + type: string + properties: + additionalProperties: + type: string + description: Properties for Shoutrrr + type: object + team: + description: name or ID of the recipient team + type: string + url: + description: Specify shoutrrr URL + type: string + type: object filter: description: Cel-expression used to decide whether this notification client should send the notification diff --git a/config/schemas/notification.schema.json b/config/schemas/notification.schema.json index ce1464ab6..68bc7c513 100644 --- a/config/schemas/notification.schema.json +++ b/config/schemas/notification.schema.json @@ -56,6 +56,39 @@ "additionalProperties": false, "type": "object" }, + "NotificationFallback": { + "properties": { + "person": { + "type": "string" + }, + "team": { + "type": "string" + }, + "email": { + "type": "string" + }, + "connection": { + "type": "string" + }, + "url": { + "type": "string" + }, + "properties": { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + "playbook": { + "type": "string" + }, + "delay": { + "type": "string" + } + }, + "additionalProperties": false, + "type": "object" + }, "NotificationRecipientSpec": { "properties": { "person": { @@ -109,6 +142,9 @@ "to": { "$ref": "#/$defs/NotificationRecipientSpec" }, + "fallback": { + "$ref": "#/$defs/NotificationFallback" + }, "waitFor": { "type": "string" }, diff --git a/db/notifications.go b/db/notifications.go index ef747c846..7ac0de8e6 100644 --- a/db/notifications.go +++ b/db/notifications.go @@ -14,6 +14,7 @@ import ( "github.com/flanksource/duty/context" "github.com/flanksource/duty/models" "github.com/flanksource/duty/query" + "github.com/flanksource/duty/types" "github.com/flanksource/incident-commander/api" v1 "github.com/flanksource/incident-commander/api/v1" "github.com/google/uuid" @@ -65,76 +66,108 @@ func PersistNotificationFromCRD(ctx context.Context, obj *v1.Notification) error } if len(obj.Spec.GroupBy) > 0 && obj.Spec.WaitFor != nil && *obj.Spec.WaitFor == "" { - return fmt.Errorf("groupBy provided with an empty waitFor. either remove the groupBy or set a waitFor period.") + return fmt.Errorf("groupBy provided with an empty waitFor. either remove the groupBy or set a waitFor period") } + if recipient, err := resolveNotificationRecipient(ctx, obj.Spec.To); err != nil { + return fmt.Errorf("failed to resolve recipient: %w", err) + } else { + dbObj.PersonID = recipient.PersonID + dbObj.TeamID = recipient.TeamID + dbObj.PlaybookID = recipient.PlaybookID + dbObj.CustomServices = recipient.CustomServices + } + + if obj.Spec.Fallback != nil { + if recipient, err := resolveNotificationRecipient(ctx, obj.Spec.Fallback.NotificationRecipientSpec); err != nil { + return fmt.Errorf("failed to resolve recipient: %w", err) + } else { + dbObj.FallbackPersonID = recipient.PersonID + dbObj.FallbackTeamID = recipient.TeamID + dbObj.FallbackPlaybookID = recipient.PlaybookID + dbObj.FallbackCustomServices = recipient.CustomServices + } + } + + return ctx.DB().Save(&dbObj).Error +} + +type notificationRecipient struct { + PersonID *uuid.UUID + TeamID *uuid.UUID + PlaybookID *uuid.UUID + CustomServices types.JSON +} + +func resolveNotificationRecipient(ctx context.Context, recipient v1.NotificationRecipientSpec) (*notificationRecipient, error) { + var result notificationRecipient switch { - case obj.Spec.To.Person != "": - person, err := query.FindPerson(ctx, obj.Spec.To.Person) + case recipient.Person != "": + person, err := query.FindPerson(ctx, recipient.Person) if err != nil { - return err + return nil, err } else if person == nil { - return fmt.Errorf("person (%s) not found", obj.Spec.To.Person) + return nil, fmt.Errorf("person (%s) not found", recipient.Person) } - dbObj.PersonID = &person.ID + result.PersonID = &person.ID - case obj.Spec.To.Team != "": - team, err := query.FindTeam(ctx, obj.Spec.To.Team) + case recipient.Team != "": + team, err := query.FindTeam(ctx, recipient.Team) if err != nil { - return err + return nil, err } else if team == nil { - return fmt.Errorf("team (%s) not found", obj.Spec.To.Team) + return nil, fmt.Errorf("team (%s) not found", recipient.Team) } - dbObj.TeamID = &team.ID + result.TeamID = &team.ID - case lo.FromPtr(obj.Spec.To.Playbook) != "": - split := strings.Split(*obj.Spec.To.Playbook, "/") + case lo.FromPtr(recipient.Playbook) != "": + split := strings.Split(*recipient.Playbook, "/") if len(split) == 1 { name := split[0] playbook, err := query.FindPlaybook(ctx, name) if err != nil { - return err + return nil, err } else if playbook == nil { - return fmt.Errorf("playbook (%s) not found", *obj.Spec.To.Playbook) + return nil, fmt.Errorf("playbook (%s) not found", *recipient.Playbook) } - dbObj.PlaybookID = &playbook.ID + result.PlaybookID = &playbook.ID } else if len(split) == 2 { namespace := split[0] name := split[1] var playbook models.Playbook if err := ctx.DB().Where("namespace = ?", namespace).Where("name = ?", name).Find(&playbook).Error; err != nil { - return err + return nil, err } else if playbook.ID == uuid.Nil { - return fmt.Errorf("playbook %s not found", *obj.Spec.To.Playbook) + return nil, fmt.Errorf("playbook %s not found", *recipient.Playbook) } else { - dbObj.PlaybookID = &playbook.ID + result.PlaybookID = &playbook.ID } } default: var customService api.NotificationConfig - if len(obj.Spec.To.Email) != 0 { - customService.URL = fmt.Sprintf("smtp://system/?To=%s", obj.Spec.To.Email) - } else if len(obj.Spec.To.Connection) != 0 { - customService.Connection = obj.Spec.To.Connection - } else if len(obj.Spec.To.URL) != 0 { - customService.URL = obj.Spec.To.URL + if len(recipient.Email) != 0 { + customService.URL = fmt.Sprintf("smtp://system/?To=%s", recipient.Email) + } else if len(recipient.Connection) != 0 { + customService.Connection = recipient.Connection + } else if len(recipient.URL) != 0 { + customService.URL = recipient.URL } customServices, err := json.Marshal([]api.NotificationConfig{customService}) if err != nil { - return err + return nil, err } - dbObj.CustomServices = customServices + result.CustomServices = customServices } - return ctx.DB().Save(&dbObj).Error + return &result, nil } func DeleteNotification(ctx context.Context, id string) error { diff --git a/fixtures/notifications/health-playbook.yaml b/fixtures/notifications/health-playbook.yaml index 9fdf6ad34..e03329ca7 100644 --- a/fixtures/notifications/health-playbook.yaml +++ b/fixtures/notifications/health-playbook.yaml @@ -9,4 +9,6 @@ spec: - config.unhealthy - config.warning to: - playbook: mc/echo-config + playbook: mc/diagnose-configs + fallback: + connection: connection://mc/slack diff --git a/go.mod b/go.mod index 453ead1fd..6035cc7f6 100644 --- a/go.mod +++ b/go.mod @@ -368,7 +368,7 @@ require ( sigs.k8s.io/yaml v1.4.0 ) -// replace github.com/flanksource/duty => ../duty +replace github.com/flanksource/duty => ../duty // replace github.com/flanksource/gomplate/v3 => ../gomplat3 diff --git a/go.sum b/go.sum index d68cff9a9..9c6db5a54 100644 --- a/go.sum +++ b/go.sum @@ -287,8 +287,6 @@ github.com/flanksource/artifacts v1.0.14 h1:Vv70bccsae0MwGaf/uSPp34J5V1/PyKfct9z github.com/flanksource/artifacts v1.0.14/go.mod h1:qHVCnQu5k50aWNJ5UhpcAKEl7pAzqUrFFKGSm147G70= github.com/flanksource/commons v1.36.1 h1:SDppXOYO6vk06d12SPOYVZJX+8gV72FmK1l9ar5fkjc= github.com/flanksource/commons v1.36.1/go.mod h1:nsYCn6JOhENXNLR5pz4zNco70DQV+bGObQ8NbOrUeuQ= -github.com/flanksource/duty v1.0.862 h1:VsqkaEKjxezMordMu3YZNd64sWH0/GWvE4uD98ScPWY= -github.com/flanksource/duty v1.0.862/go.mod h1:5NmEw4b/vz08vXNR4WPuHWNtD6YB2pGu2RrX1HrDT8E= github.com/flanksource/gomplate/v3 v3.24.55 h1:BW+KeMcggCkNawb4VTbY+Zn3s9eYIhwqb5/myiyJRb8= github.com/flanksource/gomplate/v3 v3.24.55/go.mod h1:hGEObNtnOQs8rNUX8sM8aJTAhnt4ehjyOw1MvDhl6AU= github.com/flanksource/is-healthy v1.0.62 h1:wa4Eq3YR1+Ku3UhKlVpos0CqieGeWWVF5gZeOsmDAIg= diff --git a/jobs/jobs.go b/jobs/jobs.go index 3c2d44101..da2f52d9e 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -10,6 +10,7 @@ import ( "github.com/flanksource/duty/context" "github.com/flanksource/duty/job" "github.com/flanksource/duty/query" + "github.com/flanksource/duty/shutdown" "github.com/flanksource/incident-commander/api" "github.com/flanksource/incident-commander/incidents" "github.com/flanksource/incident-commander/notification" @@ -67,6 +68,10 @@ func Start(ctx context.Context) { logger.Errorf("Failed to schedule job for cleaning up event queue table: %v", err) } + if err := notification.ProcessFallbackCheckNotificationsJob(ctx).AddToScheduler(FuncScheduler); err != nil { + shutdown.ShutdownAndExit(1, fmt.Sprintf("failed to schedule job ProcessFallbackCheckNotificationsJob: %v", err)) + } + if err := notification.ProcessPendingNotificationsJob(ctx).AddToScheduler(FuncScheduler); err != nil { logger.Errorf("failed to schedule job: %v", err) } diff --git a/notification/job.go b/notification/job.go index 3f3550be7..d67e15949 100644 --- a/notification/job.go +++ b/notification/job.go @@ -203,7 +203,7 @@ func ProcessPendingNotifications(parentCtx context.Context) (bool, error) { if err := processPendingNotification(ctx, currentHistory, []models.NotificationSendHistory{}); err != nil { if dberr := ctx.DB().Debug().Model(&models.NotificationSendHistory{}).Where("id = ?", currentHistory.ID).UpdateColumns(map[string]any{ - "status": gorm.Expr("CASE WHEN retries >= ? THEN ? ELSE ? END", ctx.Properties().Int("notification.max-retries", 4)-1, models.NotificationStatusError, models.NotificationStatusPending), + "status": gorm.Expr("CASE WHEN retries >= ? THEN ? ELSE ? END", ctx.Properties().Int("notification.max-retries", 4)-1, models.NotificationStatusCheckingFallback, models.NotificationStatusPending), "error": err.Error(), "retries": gorm.Expr("retries + 1"), }).Error; dberr != nil { @@ -283,7 +283,7 @@ func ProcessPendingGroupedNotifications(parentCtx context.Context) error { historyIDs := lo.Map(pending, func(h models.NotificationSendHistory, _ int) uuid.UUID { return h.ID }) if err := processPendingNotification(ctx, currentHistory, groupedHistory); err != nil { if dberr := ctx.DB().Model(&models.NotificationSendHistory{}).Where("id IN ?", historyIDs).UpdateColumns(map[string]any{ - "status": gorm.Expr("CASE WHEN retries >= ? THEN ? ELSE ? END", ctx.Properties().Int("notification.max-retries", 4)-1, models.NotificationStatusError, models.NotificationStatusPending), + "status": gorm.Expr("CASE WHEN retries >= ? THEN ? ELSE ? END", ctx.Properties().Int("notification.max-retries", 4)-1, models.NotificationStatusCheckingFallback, models.NotificationStatusPending), "error": err.Error(), "retries": gorm.Expr("retries + 1"), }).Error; dberr != nil { @@ -303,6 +303,106 @@ func ProcessPendingGroupedNotifications(parentCtx context.Context) error { return nil } +func ProcessFallbackCheckNotificationsJob(ctx context.Context) *job.Job { + return &job.Job{ + Name: "ProcessFallbackCheckNotifications", + Retention: job.RetentionFailed, + JobHistory: true, + RunNow: true, + Context: ctx, + Singleton: false, + Schedule: "@every 15s", + Fn: func(ctx job.JobRuntime) error { + var errCount int + for { + done, err := ProcessFallbackCheckNotifications(ctx.Context) + if err != nil { + ctx.History.AddErrorf("failed to process notifications in status=%s : %v", models.NotificationStatusCheckingFallback, err) + + errCount++ + if errCount > 3 { + // avoid getting stuck in a loop + break + } + + time.Sleep(2 * time.Second) // prevent spinning on db errors + continue + } + + ctx.History.IncrSuccess() + + if done { + break + } + } + + return nil + }, + } +} + +func ProcessFallbackCheckNotifications(parentCtx context.Context) (bool, error) { + var noMorePending bool + + err := parentCtx.DB().Transaction(func(tx *gorm.DB) error { + ctx := parentCtx.WithDB(tx, parentCtx.Pool()) + + var pending []models.NotificationSendHistory + if err := ctx.DB().Clauses(clause.Locking{Strength: clause.LockingStrengthUpdate, Options: clause.LockingOptionsSkipLocked}). + Where("status = ?", models.NotificationStatusCheckingFallback). + Where("not_before <= NOW() OR not_before IS NULL"). + Order("not_before"). + Limit(100). // safeguard limit + Find(&pending).Error; err != nil { + return fmt.Errorf("failed to get notifications awaiting fallback checks: %w", err) + } + + if len(pending) == 0 { + noMorePending = true + return nil + } + + var ( + noFallback []string // list of notification send histories that don't have fallback configured + shouldFallback []string // list of notification send histories that should be retried to fallback recipients + ) + for _, history := range pending { + notif, err := GetNotification(ctx, history.NotificationID.String()) + if err != nil { + return fmt.Errorf("failed to get notification: %w", err) + } + + if len(notif.FallbackCustomServices) != 0 && !history.IsFallback { + shouldFallback = append(shouldFallback, history.ID.String()) + } else { + noFallback = append(noFallback, history.ID.String()) + } + } + + if len(shouldFallback) > 0 { + if err := ctx.DB().Model(&models.NotificationSendHistory{}).Where("id IN ?", shouldFallback).UpdateColumns(map[string]any{ + "status": models.NotificationStatusAttemptingFallback, + "is_fallback": true, + }).Error; err != nil { + return fmt.Errorf("failed to update notification status to attempting fallback: %w", err) + } + } + + if len(noFallback) > 0 { + if err := ctx.DB().Model(&models.NotificationSendHistory{}).Where("id IN ?", noFallback).UpdateColumns(map[string]any{ + "status": models.NotificationStatusError, + }).Error; err != nil { + return fmt.Errorf("failed to update notification status to error: %w", err) + } + } + + // we return nil or else the transaction will be rolled back and there'll be no trace of a failed attempt. + return nil + }) + + return noMorePending, err +} + // If the resource is still unhealthy, it returns false func shouldSkipNotificationDueToHealth(ctx context.Context, notif NotificationWithSpec, currentHistory models.NotificationSendHistory) (bool, error) { var payload NotificationEventPayload diff --git a/playbook/events.go b/playbook/events.go index 5a6a36678..f6a9e1116 100644 --- a/playbook/events.go +++ b/playbook/events.go @@ -312,7 +312,7 @@ func onNewRun(ctx context.Context, event models.Event) error { columnUpdates["status"] = models.NotificationStatusPendingPlaybookCompletion } else { columnUpdates["error"] = err.Error() - columnUpdates["status"] = models.NotificationStatusError + columnUpdates["status"] = models.NotificationStatusCheckingFallback } if err := ctx.DB().Model(&models.NotificationSendHistory{}).Where("id = ?", notificationDispatchID).UpdateColumns(columnUpdates).Error; err != nil {