Skip to content

Commit

Permalink
chore(pkger): handle edge cases for tasks that manifest from user int…
Browse files Browse the repository at this point in the history
…eraction

references: #17434
  • Loading branch information
jsteenb2 committed May 6, 2020
1 parent 7be0d82 commit 5a304cf
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 3 deletions.
42 changes: 42 additions & 0 deletions cmd/influxd/launcher/pkger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,42 @@ func TestLauncher_Pkger(t *testing.T) {
})
})
})

t.Run("when a user has deleted a task that was previously created by a stack", func(t *testing.T) {
testUserDeletedTask := func(t *testing.T, actionFn func(t *testing.T, stackID influxdb.ID, initialObj pkger.Object, initialSum pkger.Summary)) {
t.Helper()

obj := newTaskObject("task-1", "", "")
stackID, cleanup, initialSum := initializeStackPkg(t, newPkg(obj))
defer cleanup()

require.Len(t, initialSum.Tasks, 1)
require.NotZero(t, initialSum.Tasks[0].ID)
resourceCheck.mustDeleteTask(t, influxdb.ID(initialSum.Tasks[0].ID))

actionFn(t, stackID, obj, initialSum)
}

t.Run("should create new resource when attempting to update", func(t *testing.T) {
testUserDeletedTask(t, func(t *testing.T, stackID influxdb.ID, initialObj pkger.Object, initialSum pkger.Summary) {
pkg := newPkg(initialObj)
updateSum, _, err := svc.Apply(ctx, l.Org.ID, l.User.ID, pkg, pkger.ApplyWithStackID(stackID))
require.NoError(t, err)

require.Len(t, updateSum.Tasks, 1)
initial, updated := initialSum.Tasks[0], updateSum.Tasks[0]
assert.NotEqual(t, initial.ID, updated.ID)
initial.ID, updated.ID = 0, 0
assert.Equal(t, initial, updated)
})
})

t.Run("should not error when attempting to remove", func(t *testing.T) {
testUserDeletedTask(t, func(t *testing.T, stackID influxdb.ID, _ pkger.Object, _ pkger.Summary) {
testValidRemoval(t, stackID)
})
})
})
})
})

Expand Down Expand Up @@ -3016,6 +3052,12 @@ func (r resourceChecker) mustGetTask(t *testing.T, getOpt getResourceOptFn) http
return task
}

func (r resourceChecker) mustDeleteTask(t *testing.T, id influxdb.ID) {
t.Helper()

require.NoError(t, r.tl.TaskService(t).DeleteTask(ctx, id))
}

func (r resourceChecker) getTelegrafConfig(t *testing.T, getOpt getResourceOptFn) (influxdb.TelegrafConfig, error) {
t.Helper()

Expand Down
13 changes: 10 additions & 3 deletions pkger/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2183,13 +2183,16 @@ func (s *Service) applyTasks(ctx context.Context, tasks []*stateTask) applier {
}

func (s *Service) applyTask(ctx context.Context, userID influxdb.ID, t *stateTask) (influxdb.Task, error) {
switch t.stateStatus {
case StateStatusRemove:
switch {
case IsRemoval(t.stateStatus):
if err := s.taskSVC.DeleteTask(ctx, t.ID()); err != nil {
if influxdb.ErrorCode(err) == influxdb.ENotFound {
return influxdb.Task{}, nil
}
return influxdb.Task{}, ierrors.Wrap(err, "failed to delete task")
}
return *t.existing, nil
case StateStatusExists:
case IsExisting(t.stateStatus) && t.existing != nil:
newFlux := t.parserTask.flux()
newStatus := string(t.parserTask.Status())
opt := options.Options{
Expand Down Expand Up @@ -2231,6 +2234,10 @@ func (s *Service) applyTask(ctx context.Context, userID influxdb.ID, t *stateTas

func (s *Service) rollbackTasks(ctx context.Context, tasks []*stateTask) error {
rollbackFn := func(t *stateTask) error {
if !IsNew(t.stateStatus) && t.existing == nil {
return nil
}

var err error
switch t.stateStatus {
case StateStatusRemove:
Expand Down

0 comments on commit 5a304cf

Please sign in to comment.