Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add support for external dependencies #412

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions pkg/apply/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/object/dependson"
"sigs.k8s.io/cli-utils/pkg/object/mutation"
"sigs.k8s.io/cli-utils/pkg/ordering"
)

Expand Down Expand Up @@ -151,7 +153,32 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
handleError(eventChannel, err)
return
}
klog.V(4).Infof("calculated %d apply objs; %d prune objs", len(applyObjs), len(pruneObjs))
klog.V(4).Infof("calculated operations (apply: %d, prune: %d)", len(applyObjs), len(pruneObjs))

// Build list of early mutators to execute before starting the task queue.
earlyMutators := []mutator.Interface{
// Set default namespace for SourceRefs in apply-time-mutation annotations
&mutator.Defaulter{
Mapper: mapper,
},
}
// Exec early mutators
err = mutator.MutateAll(ctx, applyObjs, earlyMutators)
if err != nil {
handleError(eventChannel, err)
return
}

deps, err := getDependencies(applyObjs)
if err != nil {
handleError(eventChannel, err)
return
}
pollIds := object.UnstructuredsToObjMetasOrDie(append(applyObjs, pruneObjs...))
externalIds := object.SetDiff(deps, pollIds)
pollIds = append(pollIds, externalIds...)
klog.V(4).Infof("calculated dependencies (total: %d, external: %d)", len(deps), len(externalIds))
klog.V(6).Infof("external dependencies: %#v)", externalIds)

// Fetch the queue (channel) of tasks that should be executed.
klog.V(4).Infoln("applier building task queue...")
Expand Down Expand Up @@ -192,9 +219,9 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
LocalNamespaces: localNamespaces(invInfo, object.UnstructuredsToObjMetasOrDie(objects)),
},
}
// Build list of apply mutators.
// Share a thread-safe cache with the status poller.
resourceCache := cache.NewResourceCacheMap()
// Build list of apply mutators.
applyMutators := []mutator.Interface{
&mutator.ApplyTimeMutator{
Client: client,
Expand Down Expand Up @@ -222,8 +249,7 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
}
// Create a new TaskStatusRunner to execute the taskQueue.
klog.V(4).Infoln("applier building TaskStatusRunner...")
allIds := object.UnstructuredsToObjMetasOrDie(append(applyObjs, pruneObjs...))
runner := taskrunner.NewTaskStatusRunner(allIds, a.statusPoller, resourceCache)
runner := taskrunner.NewTaskStatusRunner(pollIds, a.statusPoller, resourceCache)
klog.V(4).Infoln("applier running TaskStatusRunner...")
err = runner.Run(ctx, taskQueue.ToChannel(), eventChannel, taskrunner.Options{
PollInterval: options.PollInterval,
Expand Down Expand Up @@ -314,3 +340,17 @@ func localNamespaces(localInv inventory.InventoryInfo, localObjs []object.ObjMet
}
return namespaces
}

func getDependencies(objs []*unstructured.Unstructured) (object.ObjMetadataSet, error) {
mutationDeps, err := mutation.GetDependencies(objs)
if err != nil {
return nil, err
}

dependsonDeps, err := dependson.GetDependencies(objs)
if err != nil {
return nil, err
}

return object.Union(mutationDeps, dependsonDeps), nil
}
112 changes: 112 additions & 0 deletions pkg/apply/mutator/defaulter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2021 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0

package mutator

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog/v2"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/object/mutation"
)

type Defaulter struct {
Mapper meta.RESTMapper
}

// Name returns a mutator identifier for logging.
func (d *Defaulter) Name() string {
return "Defaulter"
}

// Mutate parses the apply-time-mutation annotation and loops through the
// substitutions, checking each source resource. If not specified, the source
// resource namespace is inherited from the target resource, as long as both the
// target resource and source resource are namespaced.
func (d *Defaulter) Mutate(ctx context.Context, obj *unstructured.Unstructured) (bool, string, error) {
mutated := false
reason := ""

subs, err := mutation.ReadAnnotation(obj)
if err != nil {
return mutated, reason, err
}
namespace := obj.GetNamespace()
newSubs, err := d.applyDefaultNamespace(subs, obj.GetNamespace())
if err != nil {
return mutated, reason, err
}
if !newSubs.Equal(subs) {
klog.V(5).Infof("updated mutation after defaults:\n%s", object.YamlStringer{O: subs})
err = mutation.WriteAnnotation(obj, newSubs)
if err != nil {
return mutated, reason, err
}
mutated = true
reason = fmt.Sprintf("annotation value updated to inherit namespace (annotation: %q, namespace: %q)", mutation.Annotation, namespace)
}
return mutated, reason, nil
}

// return fmt.Errorf("failed to update resource (%s) with defaults for apply-time-mutation: %w", targetRef, err)

func (d *Defaulter) applyDefaultNamespace(subs mutation.ApplyTimeMutation, namespace string) (mutation.ApplyTimeMutation, error) {
newSubs := make(mutation.ApplyTimeMutation, len(subs))
for i, sub := range subs {
// lookup REST mapping
sourceMapping, err := d.getMapping(sub.SourceRef)
if err != nil {
// If we can't find a match, just keep going. This can happen
// if CRDs and CRs are applied at the same time.
//
// As long as Mutate() also applies the default namespace,
// the only case that can't use namespace defaulting is resources
// applied asynchrounously by another client, with a CRD in this
// apply set,
if meta.IsNoMatchError(err) {
klog.V(5).Infof("source resource (%s) scope: unknown", sub.SourceRef)
newSubs[i] = sub
continue
}
return subs, fmt.Errorf("failed to identify source resource mapping (%s): %w", sub.SourceRef, err)
}

klog.V(5).Infof("source resource (%s) scope: %s", sub.SourceRef, sourceMapping.Scope.Name())

// Default source namespace to target namesapce, if namespace-scoped
if sub.SourceRef.Namespace == "" && sourceMapping.Scope.Name() == meta.RESTScopeNameNamespace {
// namespace required
if namespace == "" {
// Empty namespace could mean an invalid target resource
// OR a cluster-scoped target resource.
// But we'll use the same error for both,
// to avoid needing to look up the mapping.
return subs, fmt.Errorf("failed to inherit namespace for source resource reference (%s): target resource namespace is empty", sub.SourceRef)
}
sub.SourceRef.Namespace = namespace
klog.V(5).Infof("source resource (%s) inherited target resource namespace (%s)", sub.SourceRef, namespace)
}
newSubs[i] = sub
}
return newSubs, nil
}

func (d *Defaulter) getMapping(ref mutation.ResourceReference) (*meta.RESTMapping, error) {
// lookup resource using group api version, if specified
sourceGvk := ref.GroupVersionKind()
var mapping *meta.RESTMapping
var err error
if sourceGvk.Version != "" {
mapping, err = d.Mapper.RESTMapping(sourceGvk.GroupKind(), sourceGvk.Version)
} else {
mapping, err = d.Mapper.RESTMapping(sourceGvk.GroupKind())
}
if err != nil {
return nil, err
}
return mapping, nil
}
Loading