Skip to content

Commit

Permalink
Merge pull request #488 from karlkfi/karl-validation-event
Browse files Browse the repository at this point in the history
feat: Add ValidationPolicy & ValidationEvent
  • Loading branch information
k8s-ci-robot authored Jan 28, 2022
2 parents 95da18d + 9acdbce commit 5fb19a1
Show file tree
Hide file tree
Showing 45 changed files with 2,473 additions and 480 deletions.
72 changes: 62 additions & 10 deletions pkg/apply/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,14 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje

// Validate the resources to make sure we catch those problems early
// before anything has been updated in the cluster.
validator := &validation.Validator{Mapper: mapper}
if err := validator.Validate(objects); err != nil {
handleError(eventChannel, err)
return
vCollector := &validation.Collector{}
validator := &validation.Validator{
Collector: vCollector,
Mapper: mapper,
}
validator.Validate(objects)

// Decide which objects to apply and which to prune
applyObjs, pruneObjs, err := a.prepareObjects(invInfo, objects, options)
if err != nil {
handleError(eventChannel, err)
Expand All @@ -166,6 +168,7 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
Mapper: mapper,
InvClient: a.invClient,
Destroy: false,
Collector: vCollector,
}
opts := solver.Options{
ServerSideOptions: options.ServerSideOptions,
Expand Down Expand Up @@ -199,7 +202,6 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
},
}
// Build list of apply mutators.
// Share a thread-safe cache with the status poller.
resourceCache := cache.NewResourceCacheMap()
applyMutators := []mutator.Interface{
&mutator.ApplyTimeMutator{
Expand All @@ -208,17 +210,43 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
ResourceCache: resourceCache,
},
}
// Build the task queue by appending tasks in the proper order.
taskQueue, err := taskBuilder.

// Build the ordered set of tasks to execute.
taskQueue := taskBuilder.
AppendInvAddTask(invInfo, applyObjs, options.DryRunStrategy).
AppendApplyWaitTasks(applyObjs, applyFilters, applyMutators, opts).
AppendPruneWaitTasks(pruneObjs, pruneFilters, opts).
AppendInvSetTask(invInfo, options.DryRunStrategy).
Build()
if err != nil {
handleError(eventChannel, err)

klog.V(4).Infof("validation errors: %d", len(vCollector.Errors))
klog.V(4).Infof("invalid objects: %d", len(vCollector.InvalidIds))

// Handle validation errors
switch options.ValidationPolicy {
case validation.ExitEarly:
err = vCollector.ToError()
if err != nil {
handleError(eventChannel, err)
return
}
case validation.SkipInvalid:
for _, err := range vCollector.Errors {
handleValidationError(eventChannel, err)
}
default:
handleError(eventChannel, fmt.Errorf("invalid ValidationPolicy: %q", options.ValidationPolicy))
return
}

// Build a TaskContext for passing info between tasks
taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)

// Register invalid objects to be retained in the inventory, if present.
for _, id := range vCollector.InvalidIds {
taskContext.AddInvalidObject(id)
}

// Send event to inform the caller about the resources that
// will be applied/pruned.
eventChannel <- event.Event{
Expand All @@ -231,7 +259,6 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
klog.V(4).Infoln("applier building TaskStatusRunner...")
allIds := object.UnstructuredSetToObjMetadataSet(append(applyObjs, pruneObjs...))
runner := taskrunner.NewTaskStatusRunner(allIds, a.StatusPoller)
taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)
klog.V(4).Infoln("applier running TaskStatusRunner...")
err = runner.Run(ctx, taskContext, taskQueue.ToChannel(), taskrunner.Options{
PollInterval: options.PollInterval,
Expand Down Expand Up @@ -283,6 +310,9 @@ type Options struct {

// InventoryPolicy defines the inventory policy of apply.
InventoryPolicy inventory.InventoryPolicy

// ValidationPolicy defines how to handle invalid objects.
ValidationPolicy validation.Policy
}

// setDefaults set the options to the default values if they
Expand Down Expand Up @@ -322,3 +352,25 @@ func localNamespaces(localInv inventory.InventoryInfo, localObjs []object.ObjMet
}
return namespaces
}

func handleValidationError(eventChannel chan<- event.Event, err error) {
switch tErr := err.(type) {
case *validation.Error:
// handle validation error about one or more specific objects
eventChannel <- event.Event{
Type: event.ValidationType,
ValidationEvent: event.ValidationEvent{
Identifiers: tErr.Identifiers(),
Error: tErr,
},
}
default:
// handle general validation error (no specific object)
eventChannel <- event.Event{
Type: event.ValidationType,
ValidationEvent: event.ValidationEvent{
Error: tErr,
},
}
}
}
Loading

0 comments on commit 5fb19a1

Please sign in to comment.