Skip to content

Commit

Permalink
chore: Add Inventory to TaskContext
Browse files Browse the repository at this point in the history
- Add a new Inventory KRM object for storing the spec and status
  of the inventory objects in memory.
- Improve reconcile, apply, & delete status tracking in the
  TaskContext/Inventory to cover all possible statuses
- Move most of the convenience methods from the TaskContext into a
  new inventory.Manager.
- Fix a minor bug where object UID might have drifted (delete &
  recreate) between GET and DELETE.
  • Loading branch information
karlkfi committed Feb 3, 2022
1 parent a5b5c83 commit 5c095e8
Show file tree
Hide file tree
Showing 22 changed files with 1,019 additions and 245 deletions.
24 changes: 20 additions & 4 deletions pkg/apply/prune/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ func (p *Pruner) Prune(
for _, obj := range objs {
id := object.UnstructuredToObjMetadata(obj)
klog.V(5).Infof("evaluating prune filters (object: %q)", id)

// UID will change if the object is deleted and re-created.
uid := obj.GetUID()
if uid == "" {
err := object.NotFound([]interface{}{"metadata", "uid"}, "")
taskContext.SendEvent(eventFactory.CreateFailedEvent(id, err))
taskContext.InventoryManager().AddFailedDelete(id)
continue
}

// Check filters to see if we're prevented from pruning/deleting object.
var filtered bool
var reason string
Expand All @@ -111,7 +121,7 @@ func (p *Pruner) Prune(
klog.Errorf("error during %s, (%s): %v", pruneFilter.Name(), id, err)
}
taskContext.SendEvent(eventFactory.CreateFailedEvent(id, err))
taskContext.AddFailedDelete(id)
taskContext.InventoryManager().AddFailedDelete(id)
break
}
if filtered {
Expand All @@ -125,7 +135,7 @@ func (p *Pruner) Prune(
klog.Errorf("error removing annotation (object: %q, annotation: %q): %v", id, inventory.OwningInventoryKey, err)
}
taskContext.SendEvent(eventFactory.CreateFailedEvent(id, err))
taskContext.AddFailedDelete(id)
taskContext.InventoryManager().AddFailedDelete(id)
break
} else {
// Inventory annotation was successfully removed from the object.
Expand All @@ -135,7 +145,7 @@ func (p *Pruner) Prune(
}
}
taskContext.SendEvent(eventFactory.CreateSkippedEvent(obj, reason))
taskContext.AddSkippedDelete(id)
taskContext.InventoryManager().AddSkippedDelete(id)
break
}
}
Expand All @@ -146,17 +156,23 @@ func (p *Pruner) Prune(
if !opts.DryRunStrategy.ClientOrServerDryRun() {
klog.V(4).Infof("deleting object (object: %q)", id)
err := p.deleteObject(id, metav1.DeleteOptions{
// Only delete the resource if it hasn't already been deleted
// and recreated since the last GET. Otherwise error.
Preconditions: &metav1.Preconditions{
UID: &uid,
},
PropagationPolicy: &opts.PropagationPolicy,
})
if err != nil {
if klog.V(4).Enabled() {
klog.Errorf("error deleting object (object: %q): %v", id, err)
}
taskContext.SendEvent(eventFactory.CreateFailedEvent(id, err))
taskContext.AddFailedDelete(id)
taskContext.InventoryManager().AddFailedDelete(id)
continue
}
}
taskContext.InventoryManager().AddSuccessfulDelete(id, obj.GetUID())
taskContext.SendEvent(eventFactory.CreateSuccessEvent(obj))
}
return nil
Expand Down
16 changes: 10 additions & 6 deletions pkg/apply/prune/prune_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,19 +482,21 @@ func TestPrune(t *testing.T) {
err = testutil.VerifyEvents(tc.expectedEvents, actualEvents)
assert.NoError(t, err)

im := taskContext.InventoryManager()

// validate record of failed prunes
for _, id := range tc.expectedFailed {
assert.Truef(t, taskContext.IsFailedDelete(id), "Prune() should mark object as failed: %s", id)
assert.Truef(t, im.IsFailedDelete(id), "Prune() should mark object as failed: %s", id)
}
for _, id := range pruneIds.Diff(tc.expectedFailed) {
assert.Falsef(t, taskContext.IsFailedDelete(id), "Prune() should NOT mark object as failed: %s", id)
assert.Falsef(t, im.IsFailedDelete(id), "Prune() should NOT mark object as failed: %s", id)
}
// validate record of skipped prunes
for _, id := range tc.expectedSkipped {
assert.Truef(t, taskContext.IsSkippedDelete(id), "Prune() should mark object as skipped: %s", id)
assert.Truef(t, im.IsSkippedDelete(id), "Prune() should mark object as skipped: %s", id)
}
for _, id := range pruneIds.Diff(tc.expectedSkipped) {
assert.Falsef(t, taskContext.IsSkippedDelete(id), "Prune() should NOT mark object as skipped: %s", id)
assert.Falsef(t, im.IsSkippedDelete(id), "Prune() should NOT mark object as skipped: %s", id)
}
// validate record of abandoned objects
for _, id := range tc.expectedAbandoned {
Expand Down Expand Up @@ -561,9 +563,11 @@ func TestPruneDeletionPrevention(t *testing.T) {
}
}

im := taskContext.InventoryManager()

assert.Truef(t, taskContext.IsAbandonedObject(pruneID), "Prune() should mark object as abandoned")
assert.Truef(t, taskContext.IsSkippedDelete(pruneID), "Prune() should mark object as skipped")
assert.Falsef(t, taskContext.IsFailedDelete(pruneID), "Prune() should NOT mark object as failed")
assert.Truef(t, im.IsSkippedDelete(pruneID), "Prune() should mark object as skipped")
assert.Falsef(t, im.IsFailedDelete(pruneID), "Prune() should NOT mark object as failed")
})
}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/apply/task/apply_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
id,
applyerror.NewUnknownTypeError(err),
))
taskContext.AddFailedApply(id)
taskContext.InventoryManager().AddFailedApply(id)
continue
}

Expand All @@ -122,13 +122,13 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
klog.Errorf("error during %s, (%s): %s", filter.Name(), id, filterErr)
}
taskContext.SendEvent(a.createApplyFailedEvent(id, filterErr))
taskContext.AddFailedApply(id)
taskContext.InventoryManager().AddFailedApply(id)
break
}
if filtered {
klog.V(4).Infof("apply filtered (filter: %q, resource: %q, reason: %q)", filter.Name(), id, reason)
taskContext.SendEvent(a.createApplyEvent(id, event.Unchanged, obj))
taskContext.AddSkippedApply(id)
taskContext.InventoryManager().AddSkippedApply(id)
break
}
}
Expand All @@ -143,7 +143,7 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
klog.Errorf("error mutating: %w", err)
}
taskContext.SendEvent(a.createApplyFailedEvent(id, err))
taskContext.AddFailedApply(id)
taskContext.InventoryManager().AddFailedApply(id)
continue
}

Expand All @@ -168,13 +168,13 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
id,
applyerror.NewApplyRunError(err),
))
taskContext.AddFailedApply(id)
taskContext.InventoryManager().AddFailedApply(id)
} else if info.Object != nil {
acc, err := meta.Accessor(info.Object)
if err == nil {
uid := acc.GetUID()
gen := acc.GetGeneration()
taskContext.AddSuccessfulApply(id, uid, gen)
taskContext.InventoryManager().AddSuccessfulApply(id, uid, gen)
}
}
}
Expand Down
22 changes: 13 additions & 9 deletions pkg/apply/task/apply_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,16 @@ func TestApplyTask_BasicAppliedObjects(t *testing.T) {
// The applied resources should be stored in the TaskContext
// for the final inventory.
expectedIDs := object.UnstructuredSetToObjMetadataSet(objs)
actual := taskContext.SuccessfulApplies()
actual := taskContext.InventoryManager().SuccessfulApplies()
if !actual.Equal(expectedIDs) {
t.Errorf("expected (%s) inventory resources, got (%s)", expectedIDs, actual)
}

im := taskContext.InventoryManager()

for _, id := range expectedIDs {
assert.Falsef(t, taskContext.IsFailedApply(id), "ApplyTask should NOT mark object as failed: %s", id)
assert.Falsef(t, taskContext.IsSkippedApply(id), "ApplyTask should NOT mark object as skipped: %s", id)
assert.Falsef(t, im.IsFailedApply(id), "ApplyTask should NOT mark object as failed: %s", id)
assert.Falsef(t, im.IsSkippedApply(id), "ApplyTask should NOT mark object as skipped: %s", id)
}
})
}
Expand Down Expand Up @@ -198,10 +200,10 @@ func TestApplyTask_FetchGeneration(t *testing.T) {
Name: info.name,
Namespace: info.namespace,
}
uid, _ := taskContext.AppliedResourceUID(id)
uid, _ := taskContext.InventoryManager().AppliedResourceUID(id)
assert.Equal(t, info.uid, uid)

gen, _ := taskContext.AppliedGeneration(id)
gen, _ := taskContext.InventoryManager().AppliedGeneration(id)
assert.Equal(t, info.generation, gen)
}
})
Expand Down Expand Up @@ -491,19 +493,21 @@ func TestApplyTaskWithError(t *testing.T) {

applyIds := object.UnstructuredSetToObjMetadataSet(tc.objs)

im := taskContext.InventoryManager()

// validate record of failed prunes
for _, id := range tc.expectedFailed {
assert.Truef(t, taskContext.IsFailedApply(id), "ApplyTask should mark object as failed: %s", id)
assert.Truef(t, im.IsFailedApply(id), "ApplyTask should mark object as failed: %s", id)
}
for _, id := range applyIds.Diff(tc.expectedFailed) {
assert.Falsef(t, taskContext.IsFailedApply(id), "ApplyTask should NOT mark object as failed: %s", id)
assert.Falsef(t, im.IsFailedApply(id), "ApplyTask should NOT mark object as failed: %s", id)
}
// validate record of skipped prunes
for _, id := range tc.expectedSkipped {
assert.Truef(t, taskContext.IsSkippedApply(id), "ApplyTask should mark object as skipped: %s", id)
assert.Truef(t, im.IsSkippedApply(id), "ApplyTask should mark object as skipped: %s", id)
}
for _, id := range applyIds.Diff(tc.expectedSkipped) {
assert.Falsef(t, taskContext.IsSkippedApply(id), "ApplyTask should NOT mark object as skipped: %s", id)
assert.Falsef(t, im.IsSkippedApply(id), "ApplyTask should NOT mark object as skipped: %s", id)
}
})
}
Expand Down
13 changes: 8 additions & 5 deletions pkg/apply/task/inv_set_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@ func (i *InvSetTask) Start(taskContext *taskrunner.TaskContext) {
klog.V(2).Infof("inventory set task starting (name: %q)", i.Name())
invObjs := object.ObjMetadataSet{}

// TODO: Just use InventoryManager.Store()
im := taskContext.InventoryManager()

// If an object applied successfully, keep or add it to the inventory.
appliedObjs := taskContext.SuccessfulApplies()
appliedObjs := im.SuccessfulApplies()
klog.V(4).Infof("set inventory %d successful applies", len(appliedObjs))
invObjs = invObjs.Union(appliedObjs)

Expand All @@ -69,7 +72,7 @@ func (i *InvSetTask) Start(taskContext *taskrunner.TaskContext) {
// This will remove new resources that failed to apply from the inventory,
// because even tho they were added by InvAddTask, the PrevInventory
// represents the inventory before the pipeline has run.
applyFailures := i.PrevInventory.Intersection(taskContext.FailedApplies())
applyFailures := i.PrevInventory.Intersection(im.FailedApplies())
klog.V(4).Infof("keep in inventory %d failed applies", len(applyFailures))
invObjs = invObjs.Union(applyFailures)

Expand All @@ -78,7 +81,7 @@ func (i *InvSetTask) Start(taskContext *taskrunner.TaskContext) {
// It's likely that all the skipped applies are already in the inventory,
// because the apply filters all currently depend on cluster state,
// but we're doing the intersection anyway just to be sure.
applySkips := i.PrevInventory.Intersection(taskContext.SkippedApplies())
applySkips := i.PrevInventory.Intersection(im.SkippedApplies())
klog.V(4).Infof("keep in inventory %d skipped applies", len(applySkips))
invObjs = invObjs.Union(applySkips)

Expand All @@ -87,7 +90,7 @@ func (i *InvSetTask) Start(taskContext *taskrunner.TaskContext) {
// It's likely that all the delete failures are already in the inventory,
// because the set of resources to prune comes from the inventory,
// but we're doing the intersection anyway just to be sure.
pruneFailures := i.PrevInventory.Intersection(taskContext.FailedDeletes())
pruneFailures := i.PrevInventory.Intersection(im.FailedDeletes())
klog.V(4).Infof("set inventory %d failed prunes", len(pruneFailures))
invObjs = invObjs.Union(pruneFailures)

Expand All @@ -96,7 +99,7 @@ func (i *InvSetTask) Start(taskContext *taskrunner.TaskContext) {
// It's likely that all the skipped deletes are already in the inventory,
// because the set of resources to prune comes from the inventory,
// but we're doing the intersection anyway just to be sure.
pruneSkips := i.PrevInventory.Intersection(taskContext.SkippedDeletes())
pruneSkips := i.PrevInventory.Intersection(im.SkippedDeletes())
klog.V(4).Infof("keep in inventory %d skipped prunes", len(pruneSkips))
invObjs = invObjs.Union(pruneSkips)

Expand Down
11 changes: 6 additions & 5 deletions pkg/apply/task/inv_set_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,20 +174,21 @@ func TestInvSetTask(t *testing.T) {
InvInfo: nil,
PrevInventory: tc.prevInventory,
}
im := context.InventoryManager()
for _, applyObj := range tc.appliedObjs {
context.AddSuccessfulApply(applyObj, "unusued-uid", int64(0))
im.AddSuccessfulApply(applyObj, "unusued-uid", int64(0))
}
for _, applyFailure := range tc.failedApplies {
context.AddFailedApply(applyFailure)
im.AddFailedApply(applyFailure)
}
for _, pruneObj := range tc.failedDeletes {
context.AddFailedDelete(pruneObj)
im.AddFailedDelete(pruneObj)
}
for _, skippedApply := range tc.skippedApplies {
context.AddSkippedApply(skippedApply)
im.AddSkippedApply(skippedApply)
}
for _, skippedDelete := range tc.skippedDeletes {
context.AddSkippedDelete(skippedDelete)
im.AddSkippedDelete(skippedDelete)
}
for _, abandonedObj := range tc.abandonedObjs {
context.AddAbandonedObject(abandonedObj)
Expand Down
2 changes: 1 addition & 1 deletion pkg/apply/task/prune_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (p *PruneTask) Start(taskContext *taskrunner.TaskContext) {
// Create filter to prevent deletion of currently applied
// objects. Must be done here to wait for applied UIDs.
uidFilter := filter.CurrentUIDFilter{
CurrentUIDs: taskContext.AppliedResourceUIDs(),
CurrentUIDs: taskContext.InventoryManager().AppliedResourceUIDs(),
}
p.Filters = append(p.Filters, uidFilter)
err := p.Pruner.Prune(
Expand Down
4 changes: 2 additions & 2 deletions pkg/apply/taskrunner/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func allMatchStatus(taskContext *TaskContext, ids object.ObjMetadataSet, s statu
return false
}

applyGen, _ := taskContext.AppliedGeneration(id) // generation at apply time
applyGen, _ := taskContext.InventoryManager().AppliedGeneration(id) // generation at apply time
cachedGen := int64(0)
if cached.Resource != nil {
cachedGen = cached.Resource.GetGeneration()
Expand All @@ -81,7 +81,7 @@ func noneMatchStatus(taskContext *TaskContext, ids object.ObjMetadataSet, s stat
return false
}

applyGen, _ := taskContext.AppliedGeneration(id) // generation at apply time
applyGen, _ := taskContext.InventoryManager().AppliedGeneration(id) // generation at apply time
cachedGen := int64(0)
if cached.Resource != nil {
cachedGen = cached.Resource.GetGeneration()
Expand Down
2 changes: 1 addition & 1 deletion pkg/apply/taskrunner/condition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestCollector_ConditionMet(t *testing.T) {

if tc.appliedGen != nil {
for id, gen := range tc.appliedGen {
taskContext.AddSuccessfulApply(id, types.UID("unused"), gen)
taskContext.InventoryManager().AddSuccessfulApply(id, types.UID("unused"), gen)
}
}

Expand Down
Loading

0 comments on commit 5c095e8

Please sign in to comment.