Skip to content

Commit

Permalink
[WIP] feat: Add ValidationPolicy & ValidationEvent
Browse files Browse the repository at this point in the history
- Added ValidationPolicy:
  ExitEarly (default) - exit before apply/prune if any objects are invalid
  SkipInvalid - apply/prune valid objects & skip invalid ones
  ApplyAll - attempt to apply all resources
- Added ValidationEvent to be sent once for every invalid object or set of
  objects, regardless of policy.
- Added generic MultiError to wrap and unwrap a list of errors.
- Simplified ValidationError to be easier to read by optionally wrapping
  MultiError.
- Add edvalidator.NewError to construct ValidationError.
- Modified graph.SortObjs to return a MultiError with all validation failures,
  instead of returning only the first error. This includes validation errors for
  depends-on and apply-time-mutation.
- Modified UnstructuredsToObjMetas & UnstructuredsToObjMetasOrDie to
  return ObjMetadataSet to avoid needing to cast in a few places.
- Moved SortObjs usage from solver to applier/destroyer.
  This allows the resulting errors to be treated as validation errors.
- Added invalid objects to the TestContext so they can be retained in
  the inventory (only if already present). This primarily applies to
  invalid annotations and dependencies. Objects without name or kind
  should never be added to the inventory.
- Removed invalid objects from the applySet/pruneSet after SortObjs.
- Added e2e test for invalid object handling.
- Removed error from ObjMetadataSet.Hash return value. Hashing will
  not error. This makes it easier to use Hash when sorting sets.
- Move validator to object/validation package.

TODO:
- Add more tests
- Update Printers to handle ValidationEvent

BREAKING CHANGE: TaskStatusRunner.Run changed to take TaskContext
  • Loading branch information
karlkfi committed Jan 6, 2022
1 parent b88d9e2 commit 0536309
Show file tree
Hide file tree
Showing 53 changed files with 2,071 additions and 792 deletions.
125 changes: 110 additions & 15 deletions pkg/apply/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ import (
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
"sigs.k8s.io/cli-utils/pkg/multierror"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/object/graph"
"sigs.k8s.io/cli-utils/pkg/object/validation"
"sigs.k8s.io/cli-utils/pkg/ordering"
)

Expand Down Expand Up @@ -141,22 +144,66 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
return
}

// Validate the resources to make sure we catch those problems early
// before anything has been updated in the cluster.
if err := (&object.Validator{
Mapper: mapper,
}).Validate(objects); err != nil {
handleError(eventChannel, err)
return
// Validate objects
var vErrors []error
validator := &validation.Validator{Mapper: mapper}
if err := validator.Validate(objects); err != nil {
vErrors = append(vErrors, err)
}

// Decide which objects to apply and which to prune
applyObjs, pruneObjs, err := a.prepareObjects(invInfo, objects, options)
if err != nil {
handleError(eventChannel, err)
return
}
klog.V(4).Infof("calculated %d apply objs; %d prune objs", len(applyObjs), len(pruneObjs))

// Sort applies into groups based on dependency ordering
applySets, err := graph.SortObjs(applyObjs)
if err != nil {
vErrors = append(vErrors, err)
}
klog.V(4).Infof("apply sets: %d", len(applySets))

// Sort prunes into groups based on reverse dependency ordering
pruneSets, err := graph.ReverseSortObjs(pruneObjs)
if err != nil {
vErrors = append(vErrors, err)
}
klog.V(4).Infof("prune sets: %d", len(pruneSets))

// Send each validation error as a ValidationEvent
vErrors = multierror.Unwrap(vErrors...)
klog.V(4).Infof("validation errors: %d", len(vErrors))
if len(vErrors) > 0 {
for _, err := range vErrors {
handleValidationError(eventChannel, err)
}
// Exit early, if configured to do so
if options.ValidationPolicy == validation.ExitEarly {
handleError(eventChannel, multierror.New(vErrors))
return
}
}

// Extract invalid objects from the validation errors
invalidIds := invalidIds(vErrors)
klog.V(4).Infof("invalid objects: %d", len(invalidIds))

// Remove invalid objects from the apply & prune sets
applySets = removeObjects(applySets, invalidIds)
pruneSets = removeObjects(pruneSets, invalidIds)

// Build a TaskContext for passing info between tasks
resourceCache := cache.NewResourceCacheMap()
taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)

// Register invalid objects to be retained in the inventory, if present.
for _, id := range invalidIds {
taskContext.AddInvalidObject(id)
}

// Fetch the queue (channel) of tasks that should be executed.
klog.V(4).Infoln("applier building task queue...")
taskBuilder := &solver.TaskQueueBuilder{
Expand Down Expand Up @@ -195,12 +242,10 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
InvPolicy: options.InventoryPolicy,
},
filter.LocalNamespacesFilter{
LocalNamespaces: localNamespaces(invInfo, object.UnstructuredsToObjMetasOrDie(objects)),
LocalNamespaces: localNamespaces(invInfo, object.UnstructuredSetToObjMetadataSet(objects)),
},
}
// Build list of apply mutators.
// Share a thread-safe cache with the status poller.
resourceCache := cache.NewResourceCacheMap()
applyMutators := []mutator.Interface{
&mutator.ApplyTimeMutator{
Client: client,
Expand All @@ -211,8 +256,8 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
// Build the task queue by appending tasks in the proper order.
taskQueue, err := taskBuilder.
AppendInvAddTask(invInfo, applyObjs, options.DryRunStrategy).
AppendApplyWaitTasks(applyObjs, applyFilters, applyMutators, opts).
AppendPruneWaitTasks(pruneObjs, pruneFilters, opts).
AppendApplyWaitTasks(applySets, applyFilters, applyMutators, opts).
AppendPruneWaitTasks(pruneSets, pruneFilters, opts).
AppendInvSetTask(invInfo, options.DryRunStrategy).
Build()
if err != nil {
Expand All @@ -229,10 +274,10 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
}
// Create a new TaskStatusRunner to execute the taskQueue.
klog.V(4).Infoln("applier building TaskStatusRunner...")
allIds := object.UnstructuredsToObjMetasOrDie(append(applyObjs, pruneObjs...))
runner := taskrunner.NewTaskStatusRunner(allIds, a.StatusPoller, resourceCache)
allIds := object.UnstructuredSetToObjMetadataSet(append(applyObjs, pruneObjs...))
runner := taskrunner.NewTaskStatusRunner(allIds, a.StatusPoller)
klog.V(4).Infoln("applier running TaskStatusRunner...")
err = runner.Run(ctx, taskQueue.ToChannel(), eventChannel, taskrunner.Options{
err = runner.Run(ctx, taskContext, taskQueue.ToChannel(), taskrunner.Options{
PollInterval: options.PollInterval,
UseCache: true,
EmitStatusEvents: options.EmitStatusEvents,
Expand Down Expand Up @@ -282,6 +327,9 @@ type Options struct {

// InventoryPolicy defines the inventory policy of apply.
InventoryPolicy inventory.InventoryPolicy

// ValidationPolicy defines how to handle invalid objects.
ValidationPolicy validation.Policy
}

// setDefaults set the options to the default values if they
Expand Down Expand Up @@ -321,3 +369,50 @@ func localNamespaces(localInv inventory.InventoryInfo, localObjs []object.ObjMet
}
return namespaces
}

func handleValidationError(eventChannel chan<- event.Event, err error) {
switch tErr := err.(type) {
case validation.Error:
// handle validation error about one or more specific objects
eventChannel <- event.Event{
Type: event.ValidationType,
ValidationEvent: event.ValidationEvent{
Identifiers: tErr.Identifiers(),
Error: tErr,
},
}
default:
// handle general validation error (no specific object)
eventChannel <- event.Event{
Type: event.ValidationType,
ValidationEvent: event.ValidationEvent{
Error: tErr,
},
}
}
}

// invalidIds extracts unique IDs from ValidationErrors
func invalidIds(errs []error) object.ObjMetadataSet {
var ids object.ObjMetadataSet
for _, err := range errs {
if tErr, ok := err.(validation.Error); ok {
ids = append(ids, tErr.Identifiers()...)
}
}
return ids.Unique()
}

// removeObjects removes objects from the setList that match ids in the removeSet.
// The original setList is modified and returned.
func removeObjects(setList []object.UnstructuredSet, removeSet object.ObjMetadataSet) []object.UnstructuredSet {
for i := range setList {
for j := len(setList[i]) - 1; j >= 0; j-- {
id := object.UnstructuredToObjMetadata(setList[i][j])
if removeSet.Contains(id) {
setList[i] = append(setList[i][:j], setList[i][j+1:]...)
}
}
}
return setList
}
Loading

0 comments on commit 0536309

Please sign in to comment.