Skip to content

Commit

Permalink
chore(pkger): add stack state management for notification endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
jsteenb2 committed Apr 6, 2020
1 parent 2746a8b commit 793c896
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 48 deletions.
136 changes: 121 additions & 15 deletions cmd/influxd/launcher/pkger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/notification"
"github.com/influxdata/influxdb/v2/notification/check"
"github.com/influxdata/influxdb/v2/notification/endpoint"
"github.com/influxdata/influxdb/v2/pkger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -42,18 +43,6 @@ func TestLauncher_Pkger(t *testing.T) {
return obj
}

newLabelObject := func(pkgName, name, desc, color string) pkger.Object {
obj := pkger.LabelToObject("", influxdb.Label{
Name: name,
Properties: map[string]string{
"color": color,
"description": desc,
},
})
obj.SetMetadataName(pkgName)
return obj
}

newCheckDeadmanObject := func(t *testing.T, pkgName, name string, every time.Duration) pkger.Object {
t.Helper()

Expand All @@ -75,6 +64,33 @@ func TestLauncher_Pkger(t *testing.T) {
return obj
}

newEndpointHTTP := func(pkgName, name, description string) pkger.Object {
obj := pkger.NotificationEndpointToObject("", &endpoint.HTTP{
Base: endpoint.Base{
Name: name,
Description: description,
Status: influxdb.Inactive,
},
AuthMethod: "none",
URL: "http://example.com",
Method: "GET",
})
obj.SetMetadataName(pkgName)
return obj
}

newLabelObject := func(pkgName, name, desc, color string) pkger.Object {
obj := pkger.LabelToObject("", influxdb.Label{
Name: name,
Properties: map[string]string{
"color": color,
"description": desc,
},
})
obj.SetMetadataName(pkgName)
return obj
}

newVariableObject := func(pkgName, name, description string) pkger.Object {
obj := pkger.VariableToObject("", influxdb.Variable{
Name: name,
Expand Down Expand Up @@ -122,12 +138,14 @@ func TestLauncher_Pkger(t *testing.T) {
var (
initialBucketPkgName = "rucketeer_1"
initialCheckPkgName = "checkers"
initialEndpointPkgName = "endzo"
initialLabelPkgName = "labelino"
initialVariablePkgName = "laces out dan"
)
initialPkg := newPkg(
newBucketObject(initialBucketPkgName, "display name", "init desc"),
newCheckDeadmanObject(t, initialCheckPkgName, "check_0", time.Minute),
newEndpointHTTP(initialEndpointPkgName, "endpoint_0", "init desc"),
newLabelObject(initialLabelPkgName, "label 1", "init desc", "#222eee"),
newVariableObject(initialVariablePkgName, "var char", "init desc"),
)
Expand All @@ -148,6 +166,10 @@ func TestLauncher_Pkger(t *testing.T) {
assert.NotZero(t, sum.Checks[0].Check.GetID())
assert.Equal(t, "check_0", sum.Checks[0].Check.GetName())

require.Len(t, sum.NotificationEndpoints, 1)
assert.NotZero(t, sum.NotificationEndpoints[0].NotificationEndpoint.GetID())
assert.Equal(t, "endpoint_0", sum.NotificationEndpoints[0].NotificationEndpoint.GetName())

require.Len(t, sum.Labels, 1)
assert.NotZero(t, sum.Labels[0].ID)
assert.Equal(t, "label 1", sum.Labels[0].Name)
Expand All @@ -167,6 +189,9 @@ func TestLauncher_Pkger(t *testing.T) {
actualCheck := resourceCheck.mustGetCheck(t, byName("check_0"))
assert.Equal(t, sum.Checks[0].Check.GetID(), actualCheck.GetID())

actualEndpint := resourceCheck.mustGetEndpoint(t, byName("endpoint_0"))
assert.Equal(t, sum.NotificationEndpoints[0].NotificationEndpoint.GetID(), actualEndpint.GetID())

actualLabel := resourceCheck.mustGetLabel(t, byName("label 1"))
assert.Equal(t, sum.Labels[0].ID, pkger.SafeID(actualLabel.ID))

Expand All @@ -178,15 +203,17 @@ func TestLauncher_Pkger(t *testing.T) {
var (
updateBucketName = "new bucket"
updateCheckName = "new check"
updateEndpointName = "new endpoint"
updateLabelName = "new label"
updateVariableName = "new variable"
)
t.Log("apply pkg with stack id where resources change")
{
updatedPkg := newPkg(
newBucketObject(initialBucketPkgName, updateBucketName, ""),
newLabelObject(initialLabelPkgName, updateLabelName, "", ""),
newCheckDeadmanObject(t, initialCheckPkgName, updateCheckName, time.Hour),
newEndpointHTTP(initialEndpointPkgName, updateEndpointName, ""),
newLabelObject(initialLabelPkgName, updateLabelName, "", ""),
newVariableObject(initialVariablePkgName, updateVariableName, ""),
)
sum, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, updatedPkg, applyOpt)
Expand All @@ -200,6 +227,11 @@ func TestLauncher_Pkger(t *testing.T) {
assert.Equal(t, initialSum.Checks[0].Check.GetID(), sum.Checks[0].Check.GetID())
assert.Equal(t, updateCheckName, sum.Checks[0].Check.GetName())

require.Len(t, sum.NotificationEndpoints, 1)
endpoint := sum.NotificationEndpoints[0].NotificationEndpoint
assert.Equal(t, initialSum.NotificationEndpoints[0].NotificationEndpoint.GetID(), endpoint.GetID())
assert.Equal(t, updateEndpointName, endpoint.GetName())

require.Len(t, sum.Labels, 1)
assert.Equal(t, initialSum.Labels[0].ID, sum.Labels[0].ID)
assert.Equal(t, updateLabelName, sum.Labels[0].Name)
Expand All @@ -216,6 +248,9 @@ func TestLauncher_Pkger(t *testing.T) {
actualCheck := resourceCheck.mustGetCheck(t, byName(updateCheckName))
require.Equal(t, initialSum.Checks[0].Check.GetID(), actualCheck.GetID())

actualEndpoint := resourceCheck.mustGetEndpoint(t, byName(updateEndpointName))
assert.Equal(t, endpoint.GetID(), actualEndpoint.GetID())

actualLabel := resourceCheck.mustGetLabel(t, byName(updateLabelName))
require.Equal(t, initialSum.Labels[0].ID, pkger.SafeID(actualLabel.ID))

Expand Down Expand Up @@ -250,6 +285,7 @@ func TestLauncher_Pkger(t *testing.T) {
newBucketObject("z_rolls_back_too", "", ""),
newLabelObject("z_label_roller", "", "", ""),
newCheckDeadmanObject(t, "z_check", "", time.Hour),
newEndpointHTTP("z_endpoint_rolls_back", "", ""),
newVariableObject("z_var_rolls_back", "", ""),
)
_, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, pkgWithDelete, applyOpt)
Expand All @@ -265,6 +301,9 @@ func TestLauncher_Pkger(t *testing.T) {
_, err := resourceCheck.getCheck(t, byName("z_check"))
require.Error(t, err)

_, err = resourceCheck.getEndpoint(t, byName("z_endpoint_rolls_back"))
require.Error(t, err)

_, err = resourceCheck.getLabel(t, byName("z_label_roller"))
require.Error(t, err)

Expand All @@ -280,6 +319,9 @@ func TestLauncher_Pkger(t *testing.T) {
actualCheck := resourceCheck.mustGetCheck(t, byName(updateCheckName))
assert.NotEqual(t, initialSum.Checks[0].Check.GetID(), actualCheck.GetID())

actualEndpoint := resourceCheck.mustGetEndpoint(t, byName(updateEndpointName))
assert.NotEqual(t, initialSum.NotificationEndpoints[0].NotificationEndpoint.GetID(), actualEndpoint.GetID())

actualLabel := resourceCheck.mustGetLabel(t, byName(updateLabelName))
assert.NotEqual(t, initialSum.Labels[0].ID, pkger.SafeID(actualLabel.ID))

Expand All @@ -292,8 +334,9 @@ func TestLauncher_Pkger(t *testing.T) {
{
allNewResourcesPkg := newPkg(
newBucketObject("non_existent_bucket", "", ""),
newLabelObject("non_existent_label", "", "", ""),
newCheckDeadmanObject(t, "non_existent_check", "", time.Minute),
newEndpointHTTP("non_existent_endpoint", "", ""),
newLabelObject("non_existent_label", "", "", ""),
newVariableObject("non_existent_var", "", ""),
)
sum, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, allNewResourcesPkg, applyOpt)
Expand All @@ -311,6 +354,13 @@ func TestLauncher_Pkger(t *testing.T) {
defer resourceCheck.mustDeleteCheck(t, sum.Checks[0].Check.GetID())
assert.Equal(t, "non_existent_check", sum.Checks[0].Check.GetName())

require.Len(t, sum.NotificationEndpoints, 1)
endpoint := sum.NotificationEndpoints[0].NotificationEndpoint
assert.NotEqual(t, initialSum.NotificationEndpoints[0].NotificationEndpoint.GetID(), endpoint.GetID())
assert.NotZero(t, endpoint.GetID())
defer resourceCheck.mustDeleteEndpoint(t, endpoint.GetID())
assert.Equal(t, "non_existent_endpoint", endpoint.GetName())

require.Len(t, sum.Labels, 1)
assert.NotEqual(t, initialSum.Labels[0].ID, sum.Labels[0].ID)
assert.NotZero(t, sum.Labels[0].ID)
Expand All @@ -333,6 +383,10 @@ func TestLauncher_Pkger(t *testing.T) {
assert.NotEqual(t, initialSum.Checks[0].Check.GetID(), sum.Checks[0].Check.GetID())
assert.Equal(t, chk.GetID(), sum.Checks[0].Check.GetID())

endpoint := resourceCheck.mustGetEndpoint(t, byName("non_existent_endpoint"))
assert.NotEqual(t, initialSum.NotificationEndpoints[0].NotificationEndpoint.GetID(), endpoint.GetID())
assert.Equal(t, endpoint.GetID(), sum.NotificationEndpoints[0].NotificationEndpoint.GetID())

label := resourceCheck.mustGetLabel(t, byName("non_existent_label"))
assert.NotEqual(t, initialSum.Labels[0].ID, sum.Labels[0].ID)
assert.Equal(t, pkger.SafeID(label.ID), sum.Labels[0].ID)
Expand All @@ -350,6 +404,9 @@ func TestLauncher_Pkger(t *testing.T) {
_, err = resourceCheck.getCheck(t, byName(updateCheckName))
require.Error(t, err)

_, err = resourceCheck.getEndpoint(t, byName(updateEndpointName))
require.Error(t, err)

_, err = resourceCheck.getLabel(t, byName(updateLabelName))
require.Error(t, err)

Expand Down Expand Up @@ -1712,6 +1769,56 @@ func (r resourceChecker) mustDeleteCheck(t *testing.T, id influxdb.ID) {
require.NoError(t, r.tl.CheckService().DeleteCheck(ctx, id))
}

func (r resourceChecker) getEndpoint(t *testing.T, getOpt getResourceOptFn) (influxdb.NotificationEndpoint, error) {
t.Helper()

endpointSVC := r.tl.NotificationEndpointService(t)

var (
e influxdb.NotificationEndpoint
err error
)
switch opt := getOpt(); {
case opt.name != "":
var endpoints []influxdb.NotificationEndpoint
endpoints, _, err = endpointSVC.FindNotificationEndpoints(timedCtx(time.Second), influxdb.NotificationEndpointFilter{
OrgID: &r.tl.Org.ID,
})
for _, existing := range endpoints {
if existing.GetName() == opt.name {
e = existing
break
}
}
case opt.id != 0:
e, err = endpointSVC.FindNotificationEndpointByID(timedCtx(time.Second), opt.id)
default:
require.Fail(t, "did not provide any get option")
}

if e == nil {
return nil, errors.New("did not find endpoint")
}

return e, err
}

func (r resourceChecker) mustGetEndpoint(t *testing.T, getOpt getResourceOptFn) influxdb.NotificationEndpoint {
t.Helper()

e, err := r.getEndpoint(t, getOpt)
require.NoError(t, err)
return e
}

func (r resourceChecker) mustDeleteEndpoint(t *testing.T, id influxdb.ID) {
t.Helper()
_, _, err := r.tl.
NotificationEndpointService(t).
DeleteNotificationEndpoint(ctx, id)
require.NoError(t, err)
}

func (r resourceChecker) getLabel(t *testing.T, getOpt getResourceOptFn) (influxdb.Label, error) {
t.Helper()

Expand Down Expand Up @@ -1779,7 +1886,6 @@ func (r resourceChecker) getVariable(t *testing.T, getOpt getResourceOptFn) (inf
}
for i := range vars {
v := vars[i]
t.Logf("got var: %+v", *v)
if v.Name == opt.name {
variable = v
break
Expand Down
7 changes: 5 additions & 2 deletions notification/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const (
HTTPType = "http"
)

var typeToEndpoint = map[string](func() influxdb.NotificationEndpoint){
var typeToEndpoint = map[string]func() influxdb.NotificationEndpoint{
SlackType: func() influxdb.NotificationEndpoint { return &Slack{} },
PagerDutyType: func() influxdb.NotificationEndpoint { return &PagerDuty{} },
HTTPType: func() influxdb.NotificationEndpoint { return &HTTP{} },
Expand All @@ -30,13 +30,16 @@ func UnmarshalJSON(b []byte) (influxdb.NotificationEndpoint, error) {
Msg: "unable to detect the notification endpoint type from json",
}
}

convertedFunc, ok := typeToEndpoint[raw.Type]
if !ok {
return nil, &influxdb.Error{
Msg: fmt.Sprintf("invalid notification endpoint type %s", raw.Type),
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("invalid notification endpoint type %s", raw.Type),
}
}
converted := convertedFunc()

if err := json.Unmarshal(b, converted); err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInternal,
Expand Down
6 changes: 3 additions & 3 deletions pkger/clone_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (ex *resourceExporter) resourceCloneToKind(ctx context.Context, r ResourceT
if err != nil {
return err
}
mapResource(e.GetOrgID(), uniqByNameResID, KindNotificationEndpoint, endpointKind(e, r.Name))
mapResource(e.GetOrgID(), uniqByNameResID, KindNotificationEndpoint, NotificationEndpointToObject(r.Name, e))
case r.Kind.is(KindNotificationRule):
rule, ruleEndpoint, err := ex.getEndpointRule(ctx, r.ID)
if err != nil {
Expand All @@ -242,7 +242,7 @@ func (ex *resourceExporter) resourceCloneToKind(ctx context.Context, r ResourceT
endpointKey := newExportKey(ruleEndpoint.GetOrgID(), uniqByNameResID, KindNotificationEndpoint, ruleEndpoint.GetName())
object, ok := ex.mObjects[endpointKey]
if !ok {
mapResource(ruleEndpoint.GetOrgID(), uniqByNameResID, KindNotificationEndpoint, endpointKind(ruleEndpoint, ""))
mapResource(ruleEndpoint.GetOrgID(), uniqByNameResID, KindNotificationEndpoint, NotificationEndpointToObject("", ruleEndpoint))
object = ex.mObjects[endpointKey]
}
endpointObjectName := object.Name()
Expand Down Expand Up @@ -806,7 +806,7 @@ func LabelToObject(name string, l influxdb.Label) Object {
return o
}

func endpointKind(e influxdb.NotificationEndpoint, name string) Object {
func NotificationEndpointToObject(name string, e influxdb.NotificationEndpoint) Object {
if name == "" {
name = e.GetName()
}
Expand Down
24 changes: 20 additions & 4 deletions pkger/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,20 @@ type DiffNotificationEndpointValues struct {
influxdb.NotificationEndpoint
}

// MarshalJSON implementation here is forced by the embedded check value here.
func (d DiffNotificationEndpointValues) MarshalJSON() ([]byte, error) {
if d.NotificationEndpoint == nil {
return json.Marshal(nil)
}
return json.Marshal(d.NotificationEndpoint)
}

// UnmarshalJSON decodes the notification endpoint. This is necessary unfortunately.
func (d *DiffNotificationEndpointValues) UnmarshalJSON(b []byte) (err error) {
d.NotificationEndpoint, err = endpoint.UnmarshalJSON(b)
if influxdb.EInvalid == influxdb.ErrorCode(err) {
return nil
}
return
}

Expand Down Expand Up @@ -1468,7 +1479,7 @@ func (n *notificationEndpoint) base() endpoint.Base {
e := endpoint.Base{
Name: n.Name(),
Description: n.description,
Status: influxdb.Active,
Status: n.influxStatus(),
}
if id := n.ID(); id > 0 {
e.ID = &id
Expand All @@ -1481,9 +1492,6 @@ func (n *notificationEndpoint) base() endpoint.Base {

func (n *notificationEndpoint) summarize() SummaryNotificationEndpoint {
base := n.base()
if n.status != "" {
base.Status = influxdb.Status(n.status)
}
sum := SummaryNotificationEndpoint{
PkgName: n.PkgName(),
LabelAssociations: toSummaryLabels(n.labels...),
Expand Down Expand Up @@ -1524,6 +1532,14 @@ func (n *notificationEndpoint) summarize() SummaryNotificationEndpoint {
return sum
}

func (n *notificationEndpoint) influxStatus() influxdb.Status {
status := influxdb.Active
if n.status != "" {
status = influxdb.Status(n.status)
}
return status
}

var validEndpointHTTPMethods = map[string]bool{
"DELETE": true,
"GET": true,
Expand Down
3 changes: 3 additions & 0 deletions pkger/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ func (p *Pkg) Summary() Summary {
sum.LabelMappings = p.labelMappings()

for _, n := range p.notificationEndpoints() {
if n.shouldRemove {
continue
}
sum.NotificationEndpoints = append(sum.NotificationEndpoints, n.summarize())
}

Expand Down
Loading

0 comments on commit 793c896

Please sign in to comment.