diff --git a/cmd/influxd/launcher/pkger_test.go b/cmd/influxd/launcher/pkger_test.go index ffe24a7fc90..7002e011d67 100644 --- a/cmd/influxd/launcher/pkger_test.go +++ b/cmd/influxd/launcher/pkger_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "reflect" "testing" "time" @@ -259,6 +260,10 @@ func TestLauncher_Pkger(t *testing.T) { }) t.Run("an error during application roles back resources to previous state", func(t *testing.T) { + if reflect.DeepEqual(initialSum, pkger.Summary{}) { + t.Skip("test setup not complete") + } + logger := l.log.With(zap.String("service", "pkger")) var svc pkger.SVC = pkger.NewService( pkger.WithLogger(logger), diff --git a/http/swagger.yml b/http/swagger.yml index c464a7ea0f2..916b89bd1ed 100644 --- a/http/swagger.yml +++ b/http/swagger.yml @@ -7657,8 +7657,8 @@ components: items: type: object properties: - remove: - type: boolean + stateStatus: + type: string id: type: string pkgName: diff --git a/pkger/models.go b/pkger/models.go index ee8d98c8949..b7269f8aa2b 100644 --- a/pkger/models.go +++ b/pkger/models.go @@ -4,7 +4,6 @@ import ( "encoding/json" "errors" "fmt" - "net/url" "reflect" "regexp" "sort" @@ -391,26 +390,6 @@ type DiffNotificationEndpoint struct { Old *DiffNotificationEndpointValues `json:"old"` } -func newDiffNotificationEndpoint(ne *notificationEndpoint, i influxdb.NotificationEndpoint) DiffNotificationEndpoint { - diff := DiffNotificationEndpoint{ - DiffIdentifier: DiffIdentifier{ - ID: SafeID(ne.ID()), - Remove: ne.shouldRemove, - PkgName: ne.PkgName(), - }, - New: DiffNotificationEndpointValues{ - NotificationEndpoint: ne.summarize().NotificationEndpoint, - }, - } - if i != nil { - diff.ID = SafeID(i.GetID()) - diff.Old = &DiffNotificationEndpointValues{ - NotificationEndpoint: i, - } - } - return diff -} - type ( // DiffNotificationRule is a diff of an individual notification rule. DiffNotificationRule struct { @@ -877,309 +856,6 @@ type SummaryVariable struct { LabelAssociations []SummaryLabel `json:"labelAssociations"` } -type thresholdType string - -const ( - thresholdTypeGreater thresholdType = "greater" - thresholdTypeLesser thresholdType = "lesser" - thresholdTypeInsideRange thresholdType = "inside_range" - thresholdTypeOutsideRange thresholdType = "outside_range" -) - -var thresholdTypes = map[thresholdType]bool{ - thresholdTypeGreater: true, - thresholdTypeLesser: true, - thresholdTypeInsideRange: true, - thresholdTypeOutsideRange: true, -} - -type threshold struct { - threshType thresholdType - allVals bool - level string - val float64 - min, max float64 -} - -func (t threshold) valid() []validationErr { - var vErrs []validationErr - if notification.ParseCheckLevel(t.level) == notification.Unknown { - vErrs = append(vErrs, validationErr{ - Field: fieldLevel, - Msg: fmt.Sprintf("must be 1 in [CRIT, WARN, INFO, OK]; got=%q", t.level), - }) - } - if !thresholdTypes[t.threshType] { - vErrs = append(vErrs, validationErr{ - Field: fieldType, - Msg: fmt.Sprintf("must be 1 in [Lesser, Greater, Inside_Range, Outside_Range]; got=%q", t.threshType), - }) - } - if t.min > t.max { - vErrs = append(vErrs, validationErr{ - Field: fieldMin, - Msg: "min must be < max", - }) - } - return vErrs -} - -func toInfluxThresholds(thresholds ...threshold) []icheck.ThresholdConfig { - var iThresh []icheck.ThresholdConfig - for _, th := range thresholds { - base := icheck.ThresholdConfigBase{ - AllValues: th.allVals, - Level: notification.ParseCheckLevel(th.level), - } - switch th.threshType { - case thresholdTypeGreater: - iThresh = append(iThresh, icheck.Greater{ - ThresholdConfigBase: base, - Value: th.val, - }) - case thresholdTypeLesser: - iThresh = append(iThresh, icheck.Lesser{ - ThresholdConfigBase: base, - Value: th.val, - }) - case thresholdTypeInsideRange, thresholdTypeOutsideRange: - iThresh = append(iThresh, icheck.Range{ - ThresholdConfigBase: base, - Max: th.max, - Min: th.min, - Within: th.threshType == thresholdTypeInsideRange, - }) - } - } - return iThresh -} - -type notificationKind int - -const ( - notificationKindHTTP notificationKind = iota + 1 - notificationKindPagerDuty - notificationKindSlack -) - -const ( - notificationHTTPAuthTypeBasic = "basic" - notificationHTTPAuthTypeBearer = "bearer" - notificationHTTPAuthTypeNone = "none" -) - -const ( - fieldNotificationEndpointHTTPMethod = "method" - fieldNotificationEndpointPassword = "password" - fieldNotificationEndpointRoutingKey = "routingKey" - fieldNotificationEndpointToken = "token" - fieldNotificationEndpointURL = "url" - fieldNotificationEndpointUsername = "username" -) - -type notificationEndpoint struct { - identity - - kind notificationKind - id influxdb.ID - OrgID influxdb.ID - description string - method string - password *references - routingKey *references - status string - token *references - httpType string - url string - username *references - - labels sortedLabels - - existing influxdb.NotificationEndpoint -} - -func (n *notificationEndpoint) Exists() bool { - return n.existing != nil -} - -func (n *notificationEndpoint) ID() influxdb.ID { - if n.existing != nil { - return n.existing.GetID() - } - return n.id -} - -func (n *notificationEndpoint) Labels() []*label { - return n.labels -} - -func (n *notificationEndpoint) ResourceType() influxdb.ResourceType { - return KindNotificationEndpointSlack.ResourceType() -} - -func (n *notificationEndpoint) base() endpoint.Base { - e := endpoint.Base{ - Name: n.Name(), - Description: n.description, - Status: n.influxStatus(), - } - if id := n.ID(); id > 0 { - e.ID = &id - } - if orgID := n.OrgID; orgID > 0 { - e.OrgID = &orgID - } - return e -} - -func (n *notificationEndpoint) summarize() SummaryNotificationEndpoint { - base := n.base() - sum := SummaryNotificationEndpoint{ - PkgName: n.PkgName(), - LabelAssociations: toSummaryLabels(n.labels...), - } - - switch n.kind { - case notificationKindHTTP: - e := &endpoint.HTTP{ - Base: base, - URL: n.url, - Method: n.method, - } - switch n.httpType { - case notificationHTTPAuthTypeBasic: - e.AuthMethod = notificationHTTPAuthTypeBasic - e.Password = n.password.SecretField() - e.Username = n.username.SecretField() - case notificationHTTPAuthTypeBearer: - e.AuthMethod = notificationHTTPAuthTypeBearer - e.Token = n.token.SecretField() - case notificationHTTPAuthTypeNone: - e.AuthMethod = notificationHTTPAuthTypeNone - } - sum.NotificationEndpoint = e - case notificationKindPagerDuty: - sum.NotificationEndpoint = &endpoint.PagerDuty{ - Base: base, - ClientURL: n.url, - RoutingKey: n.routingKey.SecretField(), - } - case notificationKindSlack: - sum.NotificationEndpoint = &endpoint.Slack{ - Base: base, - URL: n.url, - Token: n.token.SecretField(), - } - } - return sum -} - -func (n *notificationEndpoint) influxStatus() influxdb.Status { - status := influxdb.Active - if n.status != "" { - status = influxdb.Status(n.status) - } - return status -} - -var validEndpointHTTPMethods = map[string]bool{ - "DELETE": true, - "GET": true, - "HEAD": true, - "OPTIONS": true, - "PATCH": true, - "POST": true, - "PUT": true, -} - -func (n *notificationEndpoint) valid() []validationErr { - var failures []validationErr - if _, err := url.Parse(n.url); err != nil || n.url == "" { - failures = append(failures, validationErr{ - Field: fieldNotificationEndpointURL, - Msg: "must be valid url", - }) - } - - status := influxdb.Status(n.status) - if status != "" && influxdb.Inactive != status && influxdb.Active != status { - failures = append(failures, validationErr{ - Field: fieldStatus, - Msg: "not a valid status; valid statues are one of [active, inactive]", - }) - } - - switch n.kind { - case notificationKindPagerDuty: - if !n.routingKey.hasValue() { - failures = append(failures, validationErr{ - Field: fieldNotificationEndpointRoutingKey, - Msg: "must be provide", - }) - } - case notificationKindHTTP: - if !validEndpointHTTPMethods[n.method] { - failures = append(failures, validationErr{ - Field: fieldNotificationEndpointHTTPMethod, - Msg: "http method must be a valid HTTP verb", - }) - } - - switch n.httpType { - case notificationHTTPAuthTypeBasic: - if !n.password.hasValue() { - failures = append(failures, validationErr{ - Field: fieldNotificationEndpointPassword, - Msg: "must provide non empty string", - }) - } - if !n.username.hasValue() { - failures = append(failures, validationErr{ - Field: fieldNotificationEndpointUsername, - Msg: "must provide non empty string", - }) - } - case notificationHTTPAuthTypeBearer: - if !n.token.hasValue() { - failures = append(failures, validationErr{ - Field: fieldNotificationEndpointToken, - Msg: "must provide non empty string", - }) - } - case notificationHTTPAuthTypeNone: - default: - failures = append(failures, validationErr{ - Field: fieldType, - Msg: fmt.Sprintf( - "invalid type provided %q; valid type is 1 in [%s, %s, %s]", - n.httpType, - notificationHTTPAuthTypeBasic, - notificationHTTPAuthTypeBearer, - notificationHTTPAuthTypeNone, - ), - }) - } - } - - if len(failures) > 0 { - return []validationErr{ - objectValidationErr(fieldSpec, failures...), - } - } - - return nil -} - -type mapperNotificationEndpoints []*notificationEndpoint - -func (n mapperNotificationEndpoints) Association(i int) labelAssociater { - return n[i] -} - -func (n mapperNotificationEndpoints) Len() int { - return len(n) -} - const ( fieldNotificationRuleChannel = "channel" fieldNotificationRuleCurrentLevel = "currentLevel" diff --git a/pkger/models_test.go b/pkger/models_test.go index 99750f81c90..72f6b725631 100644 --- a/pkger/models_test.go +++ b/pkger/models_test.go @@ -486,11 +486,6 @@ func TestPkg(t *testing.T) { kind: KindLabel, validName: "label_1", }, - { - pkgFile: "testdata/notification_endpoint.yml", - kind: KindNotificationEndpoint, - validName: "slack_notification_endpoint", - }, { pkgFile: "testdata/notification_rule.yml", kind: KindNotificationRule, diff --git a/pkger/parser.go b/pkger/parser.go index 80020b02aae..77e66e8e485 100644 --- a/pkger/parser.go +++ b/pkger/parser.go @@ -419,14 +419,6 @@ func (p *Pkg) addObjectForRemoval(k Kind, pkgName string, id influxdb.ID) { identity: newIdentity, id: id, } - case KindNotificationEndpoint, - KindNotificationEndpointHTTP, - KindNotificationEndpointPagerDuty, - KindNotificationEndpointSlack: - p.mNotificationEndpoints[pkgName] = ¬ificationEndpoint{ - identity: newIdentity, - id: id, - } case KindNotificationRule: p.mNotificationRules[pkgName] = ¬ificationRule{ identity: newIdentity, @@ -457,14 +449,6 @@ func (p *Pkg) getObjectIDSetter(k Kind, pkgName string) (func(influxdb.ID), bool return func(id influxdb.ID) { l.id = id }, ok - case KindNotificationEndpoint, - KindNotificationEndpointHTTP, - KindNotificationEndpointPagerDuty, - KindNotificationEndpointSlack: - e, ok := p.mNotificationEndpoints[pkgName] - return func(id influxdb.ID) { - e.id = id - }, ok case KindNotificationRule: r, ok := p.mNotificationRules[pkgName] return func(id influxdb.ID) { @@ -941,7 +925,7 @@ func (p *Pkg) graphNotificationEndpoints() *parseErr { notificationKinds := []struct { kind Kind - notificationKind notificationKind + notificationKind notificationEndpointKind }{ { kind: KindNotificationEndpointHTTP, diff --git a/pkger/parser_models.go b/pkger/parser_models.go index d0a24a79aef..5fea5edace6 100644 --- a/pkger/parser_models.go +++ b/pkger/parser_models.go @@ -2,6 +2,7 @@ package pkger import ( "fmt" + "net/url" "strconv" "strings" "time" @@ -9,6 +10,7 @@ import ( "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/notification" icheck "github.com/influxdata/influxdb/v2/notification/check" + "github.com/influxdata/influxdb/v2/notification/endpoint" ) type identity struct { @@ -571,3 +573,274 @@ func (v *variable) valid() []validationErr { return nil } + +type thresholdType string + +const ( + thresholdTypeGreater thresholdType = "greater" + thresholdTypeLesser thresholdType = "lesser" + thresholdTypeInsideRange thresholdType = "inside_range" + thresholdTypeOutsideRange thresholdType = "outside_range" +) + +var thresholdTypes = map[thresholdType]bool{ + thresholdTypeGreater: true, + thresholdTypeLesser: true, + thresholdTypeInsideRange: true, + thresholdTypeOutsideRange: true, +} + +type threshold struct { + threshType thresholdType + allVals bool + level string + val float64 + min, max float64 +} + +func (t threshold) valid() []validationErr { + var vErrs []validationErr + if notification.ParseCheckLevel(t.level) == notification.Unknown { + vErrs = append(vErrs, validationErr{ + Field: fieldLevel, + Msg: fmt.Sprintf("must be 1 in [CRIT, WARN, INFO, OK]; got=%q", t.level), + }) + } + if !thresholdTypes[t.threshType] { + vErrs = append(vErrs, validationErr{ + Field: fieldType, + Msg: fmt.Sprintf("must be 1 in [Lesser, Greater, Inside_Range, Outside_Range]; got=%q", t.threshType), + }) + } + if t.min > t.max { + vErrs = append(vErrs, validationErr{ + Field: fieldMin, + Msg: "min must be < max", + }) + } + return vErrs +} + +func toInfluxThresholds(thresholds ...threshold) []icheck.ThresholdConfig { + var iThresh []icheck.ThresholdConfig + for _, th := range thresholds { + base := icheck.ThresholdConfigBase{ + AllValues: th.allVals, + Level: notification.ParseCheckLevel(th.level), + } + switch th.threshType { + case thresholdTypeGreater: + iThresh = append(iThresh, icheck.Greater{ + ThresholdConfigBase: base, + Value: th.val, + }) + case thresholdTypeLesser: + iThresh = append(iThresh, icheck.Lesser{ + ThresholdConfigBase: base, + Value: th.val, + }) + case thresholdTypeInsideRange, thresholdTypeOutsideRange: + iThresh = append(iThresh, icheck.Range{ + ThresholdConfigBase: base, + Max: th.max, + Min: th.min, + Within: th.threshType == thresholdTypeInsideRange, + }) + } + } + return iThresh +} + +type notificationEndpointKind int + +const ( + notificationKindHTTP notificationEndpointKind = iota + 1 + notificationKindPagerDuty + notificationKindSlack +) + +const ( + notificationHTTPAuthTypeBasic = "basic" + notificationHTTPAuthTypeBearer = "bearer" + notificationHTTPAuthTypeNone = "none" +) + +const ( + fieldNotificationEndpointHTTPMethod = "method" + fieldNotificationEndpointPassword = "password" + fieldNotificationEndpointRoutingKey = "routingKey" + fieldNotificationEndpointToken = "token" + fieldNotificationEndpointURL = "url" + fieldNotificationEndpointUsername = "username" +) + +type notificationEndpoint struct { + identity + + kind notificationEndpointKind + description string + method string + password *references + routingKey *references + status string + token *references + httpType string + url string + username *references + + labels sortedLabels +} + +func (n *notificationEndpoint) Labels() []*label { + return n.labels +} + +func (n *notificationEndpoint) ResourceType() influxdb.ResourceType { + return KindNotificationEndpointSlack.ResourceType() +} + +func (n *notificationEndpoint) base() endpoint.Base { + return endpoint.Base{ + Name: n.Name(), + Description: n.description, + Status: n.influxStatus(), + } +} + +func (n *notificationEndpoint) summarize() SummaryNotificationEndpoint { + base := n.base() + sum := SummaryNotificationEndpoint{ + PkgName: n.PkgName(), + LabelAssociations: toSummaryLabels(n.labels...), + } + + switch n.kind { + case notificationKindHTTP: + e := &endpoint.HTTP{ + Base: base, + URL: n.url, + Method: n.method, + } + switch n.httpType { + case notificationHTTPAuthTypeBasic: + e.AuthMethod = notificationHTTPAuthTypeBasic + e.Password = n.password.SecretField() + e.Username = n.username.SecretField() + case notificationHTTPAuthTypeBearer: + e.AuthMethod = notificationHTTPAuthTypeBearer + e.Token = n.token.SecretField() + case notificationHTTPAuthTypeNone: + e.AuthMethod = notificationHTTPAuthTypeNone + } + sum.NotificationEndpoint = e + case notificationKindPagerDuty: + sum.NotificationEndpoint = &endpoint.PagerDuty{ + Base: base, + ClientURL: n.url, + RoutingKey: n.routingKey.SecretField(), + } + case notificationKindSlack: + sum.NotificationEndpoint = &endpoint.Slack{ + Base: base, + URL: n.url, + Token: n.token.SecretField(), + } + } + return sum +} + +func (n *notificationEndpoint) influxStatus() influxdb.Status { + status := influxdb.Active + if n.status != "" { + status = influxdb.Status(n.status) + } + return status +} + +var validEndpointHTTPMethods = map[string]bool{ + "DELETE": true, + "GET": true, + "HEAD": true, + "OPTIONS": true, + "PATCH": true, + "POST": true, + "PUT": true, +} + +func (n *notificationEndpoint) valid() []validationErr { + var failures []validationErr + if _, err := url.Parse(n.url); err != nil || n.url == "" { + failures = append(failures, validationErr{ + Field: fieldNotificationEndpointURL, + Msg: "must be valid url", + }) + } + + status := influxdb.Status(n.status) + if status != "" && influxdb.Inactive != status && influxdb.Active != status { + failures = append(failures, validationErr{ + Field: fieldStatus, + Msg: "not a valid status; valid statues are one of [active, inactive]", + }) + } + + switch n.kind { + case notificationKindPagerDuty: + if !n.routingKey.hasValue() { + failures = append(failures, validationErr{ + Field: fieldNotificationEndpointRoutingKey, + Msg: "must be provide", + }) + } + case notificationKindHTTP: + if !validEndpointHTTPMethods[n.method] { + failures = append(failures, validationErr{ + Field: fieldNotificationEndpointHTTPMethod, + Msg: "http method must be a valid HTTP verb", + }) + } + + switch n.httpType { + case notificationHTTPAuthTypeBasic: + if !n.password.hasValue() { + failures = append(failures, validationErr{ + Field: fieldNotificationEndpointPassword, + Msg: "must provide non empty string", + }) + } + if !n.username.hasValue() { + failures = append(failures, validationErr{ + Field: fieldNotificationEndpointUsername, + Msg: "must provide non empty string", + }) + } + case notificationHTTPAuthTypeBearer: + if !n.token.hasValue() { + failures = append(failures, validationErr{ + Field: fieldNotificationEndpointToken, + Msg: "must provide non empty string", + }) + } + case notificationHTTPAuthTypeNone: + default: + failures = append(failures, validationErr{ + Field: fieldType, + Msg: fmt.Sprintf( + "invalid type provided %q; valid type is 1 in [%s, %s, %s]", + n.httpType, + notificationHTTPAuthTypeBasic, + notificationHTTPAuthTypeBearer, + notificationHTTPAuthTypeNone, + ), + }) + } + } + + if len(failures) > 0 { + return []validationErr{ + objectValidationErr(fieldSpec, failures...), + } + } + + return nil +} diff --git a/pkger/service.go b/pkger/service.go index e0cbdbfbcca..43eeca269f6 100644 --- a/pkger/service.go +++ b/pkger/service.go @@ -680,18 +680,16 @@ func (s *Service) dryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg, opts s.dryRunChecks(ctx, orgID, state.mChecks) s.dryRunLabels(ctx, orgID, state.mLabels) s.dryRunVariables(ctx, orgID, state.mVariables) + err := s.dryRunNotificationEndpoints(ctx, orgID, state.mEndpoints) + if err != nil { + return Summary{}, Diff{}, nil, ierrors.Wrap(err, "failed to dry run notification endpoints") + } var diff Diff diff.Dashboards = s.dryRunDashboards(pkg) diff.Tasks = s.dryRunTasks(pkg) diff.Telegrafs = s.dryRunTelegraf(pkg) - diffEndpoints, err := s.dryRunNotificationEndpoints(ctx, orgID, pkg) - if err != nil { - return Summary{}, Diff{}, nil, err - } - diff.NotificationEndpoints = diffEndpoints - diffRules, err := s.dryRunNotificationRules(ctx, orgID, pkg) if err != nil { return Summary{}, Diff{}, nil, err @@ -714,11 +712,12 @@ func (s *Service) dryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg, opts diff.Buckets = stateDiff.Buckets diff.Checks = stateDiff.Checks + diff.NotificationEndpoints = stateDiff.NotificationEndpoints diff.Labels = stateDiff.Labels diff.Variables = stateDiff.Variables diff.LabelMappings = append(stateDiff.LabelMappings, diff.LabelMappings...) - return pkg.Summary(), diff, state, parseErr + return newSummaryFromStatePkg(pkg, state), diff, state, parseErr } func (s *Service) dryRunBuckets(ctx context.Context, orgID influxdb.ID, bkts map[string]*stateBucket) { @@ -779,12 +778,12 @@ func (s *Service) dryRunLabels(ctx context.Context, orgID influxdb.ID, labels ma } } -func (s *Service) dryRunNotificationEndpoints(ctx context.Context, orgID influxdb.ID, pkg *Pkg) ([]DiffNotificationEndpoint, error) { +func (s *Service) dryRunNotificationEndpoints(ctx context.Context, orgID influxdb.ID, endpoints map[string]*stateEndpoint) error { existingEndpoints, _, err := s.endpointSVC.FindNotificationEndpoints(ctx, influxdb.NotificationEndpointFilter{ OrgID: &orgID, }) // grab em all if err != nil { - return nil, internalErr(err) + return internalErr(err) } mExistingByName := make(map[string]influxdb.NotificationEndpoint) @@ -795,35 +794,25 @@ func (s *Service) dryRunNotificationEndpoints(ctx context.Context, orgID influxd mExistingByID[e.GetID()] = e } - findEndpoint := func(e *notificationEndpoint) influxdb.NotificationEndpoint { + findEndpoint := func(e *stateEndpoint) influxdb.NotificationEndpoint { if iExisting, ok := mExistingByID[e.ID()]; ok { return iExisting } - if iExisting, ok := mExistingByName[e.Name()]; ok { + if iExisting, ok := mExistingByName[e.parserEndpoint.Name()]; ok { return iExisting } return nil } - mExistingToNew := make(map[string]DiffNotificationEndpoint) - endpoints := pkg.notificationEndpoints() - for i := range endpoints { - newEndpoint := endpoints[i] - + for _, newEndpoint := range endpoints { existing := findEndpoint(newEndpoint) + if IsNew(newEndpoint.stateStatus) && existing != nil { + newEndpoint.stateStatus = StateStatusExists + } newEndpoint.existing = existing - mExistingToNew[newEndpoint.Name()] = newDiffNotificationEndpoint(newEndpoint, existing) - } - - diffs := make([]DiffNotificationEndpoint, 0, len(mExistingToNew)) - for _, diff := range mExistingToNew { - diffs = append(diffs, diff) } - sort.Slice(diffs, func(i, j int) bool { - return diffs[i].PkgName < diffs[j].PkgName - }) - return diffs, nil + return nil } func (s *Service) dryRunNotificationRules(ctx context.Context, orgID influxdb.ID, pkg *Pkg) ([]DiffNotificationRule, error) { @@ -970,7 +959,6 @@ type ( func (s *Service) dryRunLabelMappings(ctx context.Context, pkg *Pkg, state *stateCoordinator) ([]DiffLabelMapping, error) { mappers := []labelMappers{ mapperDashboards(pkg.dashboards()), - mapperNotificationEndpoints(pkg.notificationEndpoints()), mapperNotificationRules(pkg.notificationRules()), mapperTasks(pkg.tasks()), mapperTelegrafs(pkg.telegrafs()), @@ -1099,6 +1087,17 @@ func (s *Service) dryRunLabelMappingsV2(ctx context.Context, state *stateCoordin mappings = append(mappings, mm...) } + for _, e := range state.mEndpoints { + if IsRemoval(e.stateStatus) { + continue + } + mm, err := s.dryRunResourceLabelMappingV2(ctx, state, stateLabelsByResName, e) + if err != nil { + return nil, err + } + mappings = append(mappings, mm...) + } + for _, v := range state.mVariables { if IsRemoval(v.stateStatus) { continue @@ -1183,6 +1182,7 @@ func (s *Service) addStackState(ctx context.Context, stackID influxdb.ID, pkg *P KindBucket, KindCheck, KindLabel, + KindNotificationEndpoint, KindVariable, } @@ -1320,7 +1320,7 @@ func (s *Service) apply(ctx context.Context, coordinator *rollbackCoordinator, o s.applyBuckets(ctx, state.buckets()), s.applyChecks(ctx, state.checks()), s.applyDashboards(pkg.dashboards()), - s.applyNotificationEndpoints(ctx, userID, pkg.notificationEndpoints()), + s.applyNotificationEndpoints(ctx, userID, state.endpoints()), s.applyTasks(pkg.tasks()), s.applyTelegrafs(pkg.telegrafs()), }, @@ -1334,7 +1334,7 @@ func (s *Service) apply(ctx context.Context, coordinator *rollbackCoordinator, o // this has to be run after the above primary resources, because it relies on // notification endpoints already being applied. - app, err := s.applyNotificationRulesGenerator(ctx, orgID, pkg) + app, err := s.applyNotificationRulesGenerator(ctx, orgID, pkg, state.endpoints()) if err != nil { return Summary{}, err } @@ -1354,30 +1354,7 @@ func (s *Service) apply(ctx context.Context, coordinator *rollbackCoordinator, o pkg.applySecrets(missingSecrets) - stateSum := state.summary() - - pkgSum := pkg.Summary() - pkgSum.Buckets = stateSum.Buckets - pkgSum.Checks = stateSum.Checks - pkgSum.Labels = stateSum.Labels - pkgSum.Variables = stateSum.Variables - - // filter out label mappings that are from pgk and replace with those - // in state. This is temporary hack to provide a bridge to the promise land... - resourcesToSkip := map[influxdb.ResourceType]bool{ - influxdb.BucketsResourceType: true, - influxdb.ChecksResourceType: true, - influxdb.VariablesResourceType: true, - } - for _, lm := range pkgSum.LabelMappings { - if resourcesToSkip[lm.ResourceType] { - continue - } - stateSum.LabelMappings = append(stateSum.LabelMappings, lm) - } - pkgSum.LabelMappings = stateSum.LabelMappings - - return pkgSum, nil + return newSummaryFromStatePkg(pkg, state), nil } func (s *Service) applyBuckets(ctx context.Context, buckets []*stateBucket) applier { @@ -1784,23 +1761,23 @@ func (s *Service) applyLabel(ctx context.Context, l *stateLabel) (influxdb.Label return *influxLabel, nil } -func (s *Service) applyNotificationEndpoints(ctx context.Context, userID influxdb.ID, endpoints []*notificationEndpoint) applier { +func (s *Service) applyNotificationEndpoints(ctx context.Context, userID influxdb.ID, endpoints []*stateEndpoint) applier { const resource = "notification_endpoints" mutex := new(doMutex) - rollbackEndpoints := make([]*notificationEndpoint, 0, len(endpoints)) + rollbackEndpoints := make([]*stateEndpoint, 0, len(endpoints)) createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody { - var endpoint notificationEndpoint + var endpoint *stateEndpoint mutex.Do(func() { - endpoints[i].OrgID = orgID - endpoint = *endpoints[i] + endpoints[i].orgID = orgID + endpoint = endpoints[i] }) influxEndpoint, err := s.applyNotificationEndpoint(ctx, endpoint, userID) if err != nil { return &applyErrBody{ - name: endpoint.Name(), + name: endpoint.parserEndpoint.Name(), msg: err.Error(), } } @@ -1810,25 +1787,25 @@ func (s *Service) applyNotificationEndpoints(ctx context.Context, userID influxd for _, secret := range influxEndpoint.SecretFields() { switch { case strings.HasSuffix(secret.Key, "-routing-key"): - if endpoints[i].routingKey == nil { - endpoints[i].routingKey = new(references) + if endpoints[i].parserEndpoint.routingKey == nil { + endpoints[i].parserEndpoint.routingKey = new(references) } - endpoints[i].routingKey.Secret = secret.Key + endpoints[i].parserEndpoint.routingKey.Secret = secret.Key case strings.HasSuffix(secret.Key, "-token"): - if endpoints[i].token == nil { - endpoints[i].token = new(references) + if endpoints[i].parserEndpoint.token == nil { + endpoints[i].parserEndpoint.token = new(references) } - endpoints[i].token.Secret = secret.Key + endpoints[i].parserEndpoint.token.Secret = secret.Key case strings.HasSuffix(secret.Key, "-username"): - if endpoints[i].username == nil { - endpoints[i].username = new(references) + if endpoints[i].parserEndpoint.username == nil { + endpoints[i].parserEndpoint.username = new(references) } - endpoints[i].username.Secret = secret.Key + endpoints[i].parserEndpoint.username.Secret = secret.Key case strings.HasSuffix(secret.Key, "-password"): - if endpoints[i].password == nil { - endpoints[i].password = new(references) + if endpoints[i].parserEndpoint.password == nil { + endpoints[i].parserEndpoint.password = new(references) } - endpoints[i].password.Secret = secret.Key + endpoints[i].parserEndpoint.password.Secret = secret.Key } } rollbackEndpoints = append(rollbackEndpoints, endpoints[i]) @@ -1851,16 +1828,15 @@ func (s *Service) applyNotificationEndpoints(ctx context.Context, userID influxd } } -func (s *Service) applyNotificationEndpoint(ctx context.Context, e notificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) { - if e.shouldRemove { +func (s *Service) applyNotificationEndpoint(ctx context.Context, e *stateEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) { + switch e.stateStatus { + case StateStatusRemove: _, _, err := s.endpointSVC.DeleteNotificationEndpoint(ctx, e.ID()) if err != nil { return nil, err } return e.existing, nil - } - - if e.Exists() { + case StateStatusExists: // stub out userID since we're always using hte http client which will fill it in for us with the token // feels a bit broken that is required. // TODO: look into this userID requirement @@ -1870,27 +1846,30 @@ func (s *Service) applyNotificationEndpoint(ctx context.Context, e notificationE e.summarize().NotificationEndpoint, userID, ) - } + default: + actual := e.summarize().NotificationEndpoint + err := s.endpointSVC.CreateNotificationEndpoint(ctx, actual, userID) + if err != nil { + return nil, err + } - actual := e.summarize().NotificationEndpoint - err := s.endpointSVC.CreateNotificationEndpoint(ctx, actual, userID) - if err != nil { - return nil, err + return actual, nil } - - return actual, nil } -func (s *Service) rollbackNotificationEndpoints(ctx context.Context, userID influxdb.ID, endpoints []*notificationEndpoint) error { - rollbackFn := func(e *notificationEndpoint) error { +func (s *Service) rollbackNotificationEndpoints(ctx context.Context, userID influxdb.ID, endpoints []*stateEndpoint) error { + rollbackFn := func(e *stateEndpoint) error { var err error - switch { - case e.shouldRemove: + switch e.stateStatus { + case StateStatusRemove: err = s.endpointSVC.CreateNotificationEndpoint(ctx, e.existing, userID) - case e.existing == nil: - _, _, err = s.endpointSVC.DeleteNotificationEndpoint(ctx, e.ID()) - default: + err = ierrors.Wrap(err, "failed to rollback removed endpoint") + case StateStatusExists: _, err = s.endpointSVC.UpdateNotificationEndpoint(ctx, e.ID(), e.existing, userID) + err = ierrors.Wrap(err, "failed to rollback updated endpoint") + default: + _, _, err = s.endpointSVC.DeleteNotificationEndpoint(ctx, e.ID()) + err = ierrors.Wrap(err, "failed to rollback created endpoint") } return err } @@ -1909,7 +1888,7 @@ func (s *Service) rollbackNotificationEndpoints(ctx context.Context, userID infl return nil } -func (s *Service) applyNotificationRulesGenerator(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (applier, error) { +func (s *Service) applyNotificationRulesGenerator(ctx context.Context, orgID influxdb.ID, pkg *Pkg, stateEndpoints []*stateEndpoint) (applier, error) { endpoints, _, err := s.endpointSVC.FindNotificationEndpoints(ctx, influxdb.NotificationEndpointFilter{ OrgID: &orgID, }) @@ -1928,15 +1907,15 @@ func (s *Service) applyNotificationRulesGenerator(ctx context.Context, orgID inf eType: e.Type(), } } - for _, e := range pkg.notificationEndpoints() { - if e.shouldRemove { + for _, e := range stateEndpoints { + if IsRemoval(e.stateStatus) { continue } - if _, ok := mEndpointsByPkgName[e.PkgName()]; ok { + if _, ok := mEndpointsByPkgName[e.parserEndpoint.PkgName()]; ok { continue } - mEndpointsByPkgName[e.PkgName()] = mVal{ + mEndpointsByPkgName[e.parserEndpoint.PkgName()] = mVal{ id: e.ID(), eType: e.summarize().NotificationEndpoint.Type(), } @@ -2469,15 +2448,15 @@ func (s *Service) updateStackAfterSuccess(ctx context.Context, stackID influxdb. Name: c.parserCheck.PkgName(), }) } - for _, n := range pkg.notificationEndpoints() { - if n.shouldRemove { + for _, n := range state.mEndpoints { + if IsRemoval(n.stateStatus) { continue } stackResources = append(stackResources, StackResource{ APIVersion: APIVersion, ID: n.ID(), Kind: KindNotificationEndpoint, - Name: n.PkgName(), + Name: n.parserEndpoint.PkgName(), }) } for _, l := range state.mLabels { @@ -2547,13 +2526,11 @@ func (s *Service) updateStackAfterRollback(ctx context.Context, stackID influxdb res.ID = c.existing.GetID() } } - for _, e := range pkg.notificationEndpoints() { - if e.shouldRemove { - res := existingResources[newKey(KindNotificationEndpoint, e.PkgName())] - if res.ID != e.ID() { - hasChanges = true - res.ID = e.existing.GetID() - } + for _, e := range state.mEndpoints { + res, ok := existingResources[newKey(KindNotificationEndpoint, e.parserEndpoint.PkgName())] + if ok && res.ID != e.ID() { + hasChanges = true + res.ID = e.existing.GetID() } } for _, l := range state.mLabels { @@ -2624,6 +2601,36 @@ func (s *Service) getAllPlatformVariables(ctx context.Context, orgID influxdb.ID return existingVars, nil } +// temporary hack while integrations are needed. +func newSummaryFromStatePkg(pkg *Pkg, state *stateCoordinator) Summary { + stateSum := state.summary() + + pkgSum := pkg.Summary() + pkgSum.Buckets = stateSum.Buckets + pkgSum.Checks = stateSum.Checks + pkgSum.NotificationEndpoints = stateSum.NotificationEndpoints + pkgSum.Labels = stateSum.Labels + pkgSum.Variables = stateSum.Variables + + // filter out label mappings that are from pgk and replace with those + // in state. This is temporary hack to provide a bridge to the promise land... + resourcesToSkip := map[influxdb.ResourceType]bool{ + influxdb.BucketsResourceType: true, + influxdb.ChecksResourceType: true, + influxdb.NotificationEndpointResourceType: true, + influxdb.VariablesResourceType: true, + } + for _, lm := range pkgSum.LabelMappings { + if resourcesToSkip[lm.ResourceType] { + continue + } + stateSum.LabelMappings = append(stateSum.LabelMappings, lm) + } + pkgSum.LabelMappings = stateSum.LabelMappings + + return pkgSum +} + func getLabelIDMap(ctx context.Context, labelSVC influxdb.LabelService, labelNames []string) (map[influxdb.ID]bool, error) { mLabelIDs := make(map[influxdb.ID]bool) for _, labelName := range labelNames { diff --git a/pkger/service_models.go b/pkger/service_models.go index 7df743e91ce..f69546f6712 100644 --- a/pkger/service_models.go +++ b/pkger/service_models.go @@ -10,6 +10,7 @@ import ( type stateCoordinator struct { mBuckets map[string]*stateBucket mChecks map[string]*stateCheck + mEndpoints map[string]*stateEndpoint mLabels map[string]*stateLabel mVariables map[string]*stateVariable @@ -20,6 +21,7 @@ func newStateCoordinator(pkg *Pkg) *stateCoordinator { state := stateCoordinator{ mBuckets: make(map[string]*stateBucket), mChecks: make(map[string]*stateCheck), + mEndpoints: make(map[string]*stateEndpoint), mLabels: make(map[string]*stateLabel), mVariables: make(map[string]*stateVariable), } @@ -36,6 +38,12 @@ func newStateCoordinator(pkg *Pkg) *stateCoordinator { stateStatus: StateStatusNew, } } + for _, pkgEndpoint := range pkg.notificationEndpoints() { + state.mEndpoints[pkgEndpoint.PkgName()] = &stateEndpoint{ + parserEndpoint: pkgEndpoint, + stateStatus: StateStatusNew, + } + } for _, pkgLabel := range pkg.labels() { state.mLabels[pkgLabel.PkgName()] = &stateLabel{ parserLabel: pkgLabel, @@ -68,6 +76,14 @@ func (s *stateCoordinator) checks() []*stateCheck { return out } +func (s *stateCoordinator) endpoints() []*stateEndpoint { + out := make([]*stateEndpoint, 0, len(s.mEndpoints)) + for _, e := range s.mEndpoints { + out = append(out, e) + } + return out +} + func (s *stateCoordinator) labels() []*stateLabel { out := make([]*stateLabel, 0, len(s.mLabels)) for _, v := range s.mLabels { @@ -100,6 +116,13 @@ func (s *stateCoordinator) diff() Diff { return diff.Checks[i].PkgName < diff.Checks[j].PkgName }) + for _, e := range s.mEndpoints { + diff.NotificationEndpoints = append(diff.NotificationEndpoints, e.diffEndpoint()) + } + sort.Slice(diff.NotificationEndpoints, func(i, j int) bool { + return diff.NotificationEndpoints[i].PkgName < diff.NotificationEndpoints[j].PkgName + }) + for _, l := range s.mLabels { diff.Labels = append(diff.Labels, l.diffLabel()) } @@ -159,6 +182,13 @@ func (s *stateCoordinator) summary() Summary { return sum.Checks[i].PkgName < sum.Checks[j].PkgName }) + for _, e := range s.mEndpoints { + if IsRemoval(e.stateStatus) { + continue + } + sum.NotificationEndpoints = append(sum.NotificationEndpoints, e.summarize()) + } + for _, v := range s.mLabels { if IsRemoval(v.stateStatus) { continue @@ -242,6 +272,15 @@ func (s *stateCoordinator) addObjectForRemoval(k Kind, pkgName string, id influx parserLabel: &label{identity: newIdentity}, stateStatus: StateStatusRemove, } + case KindNotificationEndpoint, + KindNotificationEndpointHTTP, + KindNotificationEndpointPagerDuty, + KindNotificationEndpointSlack: + s.mEndpoints[pkgName] = &stateEndpoint{ + id: id, + parserEndpoint: ¬ificationEndpoint{identity: newIdentity}, + stateStatus: StateStatusRemove, + } case KindVariable: s.mVariables[pkgName] = &stateVariable{ id: id, @@ -271,6 +310,15 @@ func (s *stateCoordinator) getObjectIDSetter(k Kind, pkgName string) (func(influ r.id = id r.stateStatus = StateStatusExists }, ok + case KindNotificationEndpoint, + KindNotificationEndpointHTTP, + KindNotificationEndpointPagerDuty, + KindNotificationEndpointSlack: + r, ok := s.mEndpoints[pkgName] + return func(id influxdb.ID) { + r.id = id + r.stateStatus = StateStatusExists + }, ok case KindVariable: r, ok := s.mVariables[pkgName] return func(id influxdb.ID) { @@ -549,6 +597,73 @@ func stateLabelMappingToInfluxLabelMapping(mapping stateLabelMapping) influxdb.L } } +type stateEndpoint struct { + id, orgID influxdb.ID + stateStatus StateStatus + + parserEndpoint *notificationEndpoint + existing influxdb.NotificationEndpoint +} + +func (e *stateEndpoint) ID() influxdb.ID { + if !IsNew(e.stateStatus) && e.existing != nil { + return e.existing.GetID() + } + return e.id +} + +func (e *stateEndpoint) diffEndpoint() DiffNotificationEndpoint { + diff := DiffNotificationEndpoint{ + DiffIdentifier: DiffIdentifier{ + ID: SafeID(e.ID()), + Remove: IsRemoval(e.stateStatus), + StateStatus: e.stateStatus, + PkgName: e.parserEndpoint.PkgName(), + }, + } + if sum := e.summarize(); sum.NotificationEndpoint != nil { + diff.New.NotificationEndpoint = sum.NotificationEndpoint + } + if e.existing != nil { + diff.Old = &DiffNotificationEndpointValues{ + NotificationEndpoint: e.existing, + } + } + return diff +} + +func (e *stateEndpoint) labels() []*label { + return e.parserEndpoint.labels +} + +func (e *stateEndpoint) resourceType() influxdb.ResourceType { + return KindNotificationEndpoint.ResourceType() +} + +func (e *stateEndpoint) stateIdentity() stateIdentity { + return stateIdentity{ + id: e.ID(), + name: e.parserEndpoint.Name(), + pkgName: e.parserEndpoint.PkgName(), + resourceType: e.resourceType(), + stateStatus: e.stateStatus, + } +} + +func (e *stateEndpoint) summarize() SummaryNotificationEndpoint { + sum := e.parserEndpoint.summarize() + if sum.NotificationEndpoint == nil { + return sum + } + if e.ID() != 0 { + sum.NotificationEndpoint.SetID(e.ID()) + } + if e.orgID != 0 { + sum.NotificationEndpoint.SetOrgID(e.orgID) + } + return sum +} + type stateVariable struct { id, orgID influxdb.ID stateStatus StateStatus diff --git a/pkger/service_test.go b/pkger/service_test.go index 48e72e9b397..93a538af744 100644 --- a/pkger/service_test.go +++ b/pkger/service_test.go @@ -301,8 +301,9 @@ func TestService(t *testing.T) { expected := DiffNotificationEndpoint{ DiffIdentifier: DiffIdentifier{ - ID: 1, - PkgName: "http_none_auth_notification_endpoint", + ID: 1, + PkgName: "http_none_auth_notification_endpoint", + StateStatus: StateStatusExists, }, Old: &DiffNotificationEndpointValues{ NotificationEndpoint: existing, @@ -942,6 +943,8 @@ func TestService(t *testing.T) { } testLabelMappingFn := func(t *testing.T, filename string, numExpected int, settersFn func() []ServiceSetterFn) { + t.Helper() + t.Run("applies successfully", func(t *testing.T) { t.Helper() testfileRunner(t, filename, func(t *testing.T, pkg *Pkg) { @@ -1087,19 +1090,22 @@ func TestService(t *testing.T) { }) t.Run("maps notification endpoints with labels", func(t *testing.T) { - testLabelMappingFn( - t, - "testdata/notification_endpoint.yml", - 5, - func() []ServiceSetterFn { - fakeEndpointSVC := mock.NewNotificationEndpointService() - fakeEndpointSVC.CreateNotificationEndpointF = func(ctx context.Context, nr influxdb.NotificationEndpoint, userID influxdb.ID) error { - nr.SetID(influxdb.ID(rand.Int())) - return nil - } - return []ServiceSetterFn{WithNotificationEndpointSVC(fakeEndpointSVC)} - }, - ) + opts := func() []ServiceSetterFn { + fakeEndpointSVC := mock.NewNotificationEndpointService() + fakeEndpointSVC.CreateNotificationEndpointF = func(ctx context.Context, nr influxdb.NotificationEndpoint, userID influxdb.ID) error { + nr.SetID(influxdb.ID(rand.Int())) + return nil + } + return []ServiceSetterFn{WithNotificationEndpointSVC(fakeEndpointSVC)} + } + + t.Run("applies successfully", func(t *testing.T) { + testLabelMappingV2ApplyFn(t, "testdata/notification_endpoint.yml", 5, opts) + }) + + t.Run("deletes new label mappings on error", func(t *testing.T) { + testLabelMappingV2RollbackFn(t, "testdata/notification_endpoint.yml", 3, opts) + }) }) t.Run("maps notification rules with labels", func(t *testing.T) { @@ -1250,17 +1256,12 @@ func TestService(t *testing.T) { fakeEndpointSVC := mock.NewNotificationEndpointService() fakeEndpointSVC.CreateNotificationEndpointF = func(ctx context.Context, nr influxdb.NotificationEndpoint, userID influxdb.ID) error { nr.SetID(influxdb.ID(fakeEndpointSVC.CreateNotificationEndpointCalls.Count() + 1)) - if fakeEndpointSVC.CreateNotificationEndpointCalls.Count() == 5 { + if fakeEndpointSVC.CreateNotificationEndpointCalls.Count() == 3 { return errors.New("hit that kill count") } return nil } - // create some dupes - for name, endpoint := range pkg.mNotificationEndpoints { - pkg.mNotificationEndpoints["copy"+name] = endpoint - } - svc := newTestService(WithNotificationEndpointSVC(fakeEndpointSVC)) orgID := influxdb.ID(9000) @@ -1268,7 +1269,7 @@ func TestService(t *testing.T) { _, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.Error(t, err) - assert.GreaterOrEqual(t, fakeEndpointSVC.DeleteNotificationEndpointCalls.Count(), 5) + assert.GreaterOrEqual(t, fakeEndpointSVC.DeleteNotificationEndpointCalls.Count(), 3) }) }) })