From 6f2eba2865e068a531f2790301c703c29ff6c844 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Tue, 16 Apr 2019 02:41:38 +0200 Subject: [PATCH] Exclude resources if we cannot find their scope This is aiming at custom resources whose scope is unknown because its CRD is included in a helm chart Flux won't inspect (since it's the helm operator's responsibility to do it). This allows Flux to move forward and create the `HelmRelease` supplying the CRD. In a subsequent sync, once we know its scope, the custom resources will be created. --- cluster/kubernetes/doc.go | 2 +- cluster/kubernetes/manifests.go | 55 +++++++++++++---- cluster/kubernetes/manifests_test.go | 88 ++++++++++++++++++++++++---- cluster/kubernetes/mock.go | 9 +++ cluster/kubernetes/namespacer.go | 1 + cluster/kubernetes/policies.go | 6 +- cluster/kubernetes/policies_test.go | 14 ++--- cluster/kubernetes/sync_test.go | 4 +- cmd/fluxd/main.go | 7 +-- daemon/daemon_test.go | 4 +- daemon/loop_test.go | 4 +- release/releaser_test.go | 21 +++---- sync/sync_test.go | 4 +- 13 files changed, 163 insertions(+), 56 deletions(-) create mode 100644 cluster/kubernetes/mock.go diff --git a/cluster/kubernetes/doc.go b/cluster/kubernetes/doc.go index 230cdba94..6a50ce369 100644 --- a/cluster/kubernetes/doc.go +++ b/cluster/kubernetes/doc.go @@ -1,6 +1,6 @@ /* Package kubernetes provides implementations of `Cluster` and -`Manifests` that interact with the Kubernetes API (using kubectl or +`manifests` that interact with the Kubernetes API (using kubectl or the k8s API client). */ package kubernetes diff --git a/cluster/kubernetes/manifests.go b/cluster/kubernetes/manifests.go index 9b5a757ad..f02ad309d 100644 --- a/cluster/kubernetes/manifests.go +++ b/cluster/kubernetes/manifests.go @@ -1,6 +1,10 @@ package kubernetes import ( + "fmt" + "strings" + + "github.com/go-kit/kit/log" "gopkg.in/yaml.v2" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -24,13 +28,23 @@ type namespacer interface { EffectiveNamespace(manifest kresource.KubeManifest, knownScopes ResourceScopes) (string, error) } -// Manifests is an implementation of cluster.Manifests, particular to +// manifests is an implementation of cluster.Manifests, particular to // Kubernetes. Aside from loading manifests from files, it does some // "post-processsing" to make sure the view of the manifests is what // would be applied; in particular, it fills in the namespace of // manifests that would be given a default namespace when applied. -type Manifests struct { - Namespacer namespacer +type manifests struct { + namespacer namespacer + logger log.Logger + resourceWarnings map[string]struct{} +} + +func NewManifests(ns namespacer, logger log.Logger) *manifests { + return &manifests{ + namespacer: ns, + logger: logger, + resourceWarnings: map[string]struct{}{}, + } } func getCRDScopes(manifests map[string]kresource.KubeManifest) ResourceScopes { @@ -60,31 +74,48 @@ func getCRDScopes(manifests map[string]kresource.KubeManifest) ResourceScopes { return result } -func setEffectiveNamespaces(manifests map[string]kresource.KubeManifest, nser namespacer) (map[string]resource.Resource, error) { +func (m *manifests) setEffectiveNamespaces(manifests map[string]kresource.KubeManifest) (map[string]resource.Resource, error) { knownScopes := getCRDScopes(manifests) result := map[string]resource.Resource{} for _, km := range manifests { - if nser != nil { - ns, err := nser.EffectiveNamespace(km, knownScopes) - if err != nil { - return nil, err + resID := km.ResourceID() + resIDStr := resID.String() + ns, err := m.namespacer.EffectiveNamespace(km, knownScopes) + if err != nil { + if strings.Contains(err.Error(), "not found") { + // discard the resource and keep going after making sure we logged about it + if _, warningLogged := m.resourceWarnings[resIDStr]; !warningLogged { + _, kind, name := resID.Components() + partialResIDStr := kind + "/" + name + m.logger.Log( + "warn", fmt.Sprintf("cannot find scope of resource %s: %s", partialResIDStr, err), + "impact", fmt.Sprintf("resource %s will be excluded until its scope is available", partialResIDStr)) + m.resourceWarnings[resIDStr] = struct{}{} + } + continue } - km.SetNamespace(ns) + return nil, err + } + km.SetNamespace(ns) + if _, warningLogged := m.resourceWarnings[resIDStr]; warningLogged { + // indicate that we found the resource's scope and allow logging a warning again + m.logger.Log("info", fmt.Sprintf("found scope of resource %s, back in bussiness!", km.ResourceID().String())) + delete(m.resourceWarnings, resIDStr) } result[km.ResourceID().String()] = km } return result, nil } -func (m *Manifests) LoadManifests(base string, paths []string) (map[string]resource.Resource, error) { +func (m *manifests) LoadManifests(base string, paths []string) (map[string]resource.Resource, error) { manifests, err := kresource.Load(base, paths) if err != nil { return nil, err } - return setEffectiveNamespaces(manifests, m.Namespacer) + return m.setEffectiveNamespaces(manifests) } -func (m *Manifests) UpdateImage(def []byte, id flux.ResourceID, container string, image image.Ref) ([]byte, error) { +func (m *manifests) UpdateImage(def []byte, id flux.ResourceID, container string, image image.Ref) ([]byte, error) { return updateWorkload(def, id, container, image) } diff --git a/cluster/kubernetes/manifests_test.go b/cluster/kubernetes/manifests_test.go index 7e4494edc..732f50389 100644 --- a/cluster/kubernetes/manifests_test.go +++ b/cluster/kubernetes/manifests_test.go @@ -1,21 +1,25 @@ package kubernetes import ( + "bytes" "io/ioutil" + "os" "path/filepath" "testing" + "github.com/go-kit/kit/log" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/weaveworks/flux/cluster/kubernetes/testfiles" ) -func TestKnownCRDScope(t *testing.T) { +func TestLocalCRDScope(t *testing.T) { coreClient := makeFakeClient() nser, err := NewNamespacer(coreClient.Discovery()) - if err != nil { - t.Fatal(err) - } - manifests := Manifests{nser} + assert.NoError(t, err) + manifests := NewManifests(nser, log.NewLogfmtLogger(os.Stdout)) dir, cleanup := testfiles.TempDir(t) defer cleanup() @@ -46,17 +50,81 @@ metadata: namespace: bar ` - if err = ioutil.WriteFile(filepath.Join(dir, "test.yaml"), []byte(defs), 0600); err != nil { - t.Fatal(err) - } + err = ioutil.WriteFile(filepath.Join(dir, "test.yaml"), []byte(defs), 0600) + assert.NoError(t, err) resources, err := manifests.LoadManifests(dir, []string{dir}) if err != nil { t.Fatal(err) } - if _, ok := resources["bar:foo/fooinstance"]; !ok { - t.Fatal("couldn't find crd instance") + assert.Contains(t, resources, "bar:foo/fooinstance") +} + +func TestUnKnownCRDScope(t *testing.T) { + coreClient := makeFakeClient() + + nser, err := NewNamespacer(coreClient.Discovery()) + assert.NoError(t, err) + logBuffer := bytes.NewBuffer(nil) + manifests := NewManifests(nser, log.NewLogfmtLogger(logBuffer)) + + dir, cleanup := testfiles.TempDir(t) + defer cleanup() + const defs = `--- +apiVersion: v1 +kind: Namespace +metadata: + name: mynamespace +--- +apiVersion: foo.example.com/v1beta1 +kind: Foo +metadata: + name: fooinstance + namespace: bar +` + + err = ioutil.WriteFile(filepath.Join(dir, "test.yaml"), []byte(defs), 0600) + assert.NoError(t, err) + + resources, err := manifests.LoadManifests(dir, []string{dir}) + assert.NoError(t, err) + + // can't contain the CRD since we cannot figure out its scope + assert.NotContains(t, resources, "bar:foo/fooinstance") + + // however, it should contain the namespace + assert.Contains(t, resources, ":namespace/mynamespace") + + savedLog := logBuffer.String() + // and we should had logged a warning about it + assert.Contains(t, savedLog, "cannot find scope of resource foo/fooinstance") + + // loading again shouldn't result in more warnings + resources, err = manifests.LoadManifests(dir, []string{dir}) + assert.NoError(t, err) + assert.Equal(t, logBuffer.String(), savedLog) + + // But getting the scope of the CRD should result in a log saying we found the scope + apiResourcesWithoutFoo := coreClient.Resources + apiResource := &metav1.APIResourceList{ + GroupVersion: "foo.example.com/v1beta1", + APIResources: []metav1.APIResource{ + {Name: "foos", SingularName: "foo", Namespaced: true, Kind: "Foo"}, + }, } + coreClient.Resources = append(coreClient.Resources, apiResource) + + logBuffer.Reset() + resources, err = manifests.LoadManifests(dir, []string{dir}) + assert.NoError(t, err) + assert.Len(t, resources, 2) + assert.Contains(t, logBuffer.String(), "found scope of resource bar:foo/fooinstance") + // and missing the scope information again should result in another warning + coreClient.Resources = apiResourcesWithoutFoo + logBuffer.Reset() + resources, err = manifests.LoadManifests(dir, []string{dir}) + assert.NoError(t, err) + assert.Contains(t, savedLog, "cannot find scope of resource foo/fooinstance") } diff --git a/cluster/kubernetes/mock.go b/cluster/kubernetes/mock.go new file mode 100644 index 000000000..d9328615e --- /dev/null +++ b/cluster/kubernetes/mock.go @@ -0,0 +1,9 @@ +package kubernetes + +import kresource "github.com/weaveworks/flux/cluster/kubernetes/resource" + +type ConstNamespacer string + +func (ns ConstNamespacer) EffectiveNamespace(manifest kresource.KubeManifest, _ ResourceScopes) (string, error) { + return string(ns), nil +} diff --git a/cluster/kubernetes/namespacer.go b/cluster/kubernetes/namespacer.go index d2f9e06a9..f45fba6d4 100644 --- a/cluster/kubernetes/namespacer.go +++ b/cluster/kubernetes/namespacer.go @@ -97,5 +97,6 @@ func (n *namespaceViaDiscovery) lookupNamespacedInCluster(groupVersion, kind str return resource.Namespaced, nil } } + return false, fmt.Errorf("resource not found for API %s, kind %s", groupVersion, kind) } diff --git a/cluster/kubernetes/policies.go b/cluster/kubernetes/policies.go index 2dfda5d5e..f475d9f47 100644 --- a/cluster/kubernetes/policies.go +++ b/cluster/kubernetes/policies.go @@ -11,7 +11,7 @@ import ( "github.com/weaveworks/flux/resource" ) -func (m *Manifests) UpdatePolicies(def []byte, id flux.ResourceID, update policy.Update) ([]byte, error) { +func (m *manifests) UpdatePolicies(def []byte, id flux.ResourceID, update policy.Update) ([]byte, error) { ns, kind, name := id.Components() add, del := update.Add, update.Remove @@ -48,7 +48,7 @@ func (m *Manifests) UpdatePolicies(def []byte, id flux.ResourceID, update policy return (KubeYAML{}).Annotate(def, ns, kind, name, args...) } -func (m *Manifests) extractWorkloadContainers(def []byte, id flux.ResourceID) ([]resource.Container, error) { +func (m *manifests) extractWorkloadContainers(def []byte, id flux.ResourceID) ([]resource.Container, error) { kresources, err := kresource.ParseMultidoc(def, "stdin") if err != nil { return nil, err @@ -58,7 +58,7 @@ func (m *Manifests) extractWorkloadContainers(def []byte, id flux.ResourceID) ([ // We could get out of our way to fix this (or give a better error) but: // 1. With the exception of HelmReleases CRD instances are not workloads anyways. // 2. The problem is eventually fixed by the first successful sync. - resources, err := setEffectiveNamespaces(kresources, m.Namespacer) + resources, err := m.setEffectiveNamespaces(kresources) if err != nil { return nil, err } diff --git a/cluster/kubernetes/policies_test.go b/cluster/kubernetes/policies_test.go index 78af6d9ee..4c37ed571 100644 --- a/cluster/kubernetes/policies_test.go +++ b/cluster/kubernetes/policies_test.go @@ -2,22 +2,17 @@ package kubernetes import ( "bytes" + "os" "testing" "text/template" + "github.com/go-kit/kit/log" "github.com/stretchr/testify/assert" "github.com/weaveworks/flux" - kresource "github.com/weaveworks/flux/cluster/kubernetes/resource" "github.com/weaveworks/flux/policy" ) -type constNamespacer string - -func (ns constNamespacer) EffectiveNamespace(manifest kresource.KubeManifest, _ ResourceScopes) (string, error) { - return string(ns), nil -} - func TestUpdatePolicies(t *testing.T) { for _, c := range []struct { name string @@ -186,7 +181,8 @@ func TestUpdatePolicies(t *testing.T) { caseIn := templToString(t, annotationsTemplate, c.in) caseOut := templToString(t, annotationsTemplate, c.out) resourceID := flux.MustParseResourceID("default:deployment/nginx") - out, err := (&Manifests{constNamespacer("default")}).UpdatePolicies([]byte(caseIn), resourceID, c.update) + manifests := NewManifests(ConstNamespacer("default"), log.NewLogfmtLogger(os.Stdout)) + out, err := manifests.UpdatePolicies([]byte(caseIn), resourceID, c.update) assert.Equal(t, c.wantErr, err != nil, "unexpected error value: %s", err) if !c.wantErr { assert.Equal(t, string(out), caseOut) @@ -200,7 +196,7 @@ func TestUpdatePolicies_invalidTagPattern(t *testing.T) { update := policy.Update{ Add: policy.Set{policy.TagPrefix("nginx"): "semver:invalid"}, } - _, err := (&Manifests{}).UpdatePolicies(nil, resourceID, update) + _, err := (&manifests{}).UpdatePolicies(nil, resourceID, update) assert.Error(t, err) } diff --git a/cluster/kubernetes/sync_test.go b/cluster/kubernetes/sync_test.go index 02dacac2f..915a3827e 100644 --- a/cluster/kubernetes/sync_test.go +++ b/cluster/kubernetes/sync_test.go @@ -2,6 +2,7 @@ package kubernetes import ( "fmt" + "os" "sort" "strings" "testing" @@ -313,6 +314,7 @@ metadata: if err != nil { t.Fatal(err) } + manifests := NewManifests(namespacer, log.NewLogfmtLogger(os.Stdout)) resources0, err := kresource.ParseMultidoc([]byte(defs), "before") if err != nil { @@ -320,7 +322,7 @@ metadata: } // Needed to get from KubeManifest to resource.Resource - resources, err := setEffectiveNamespaces(resources0, namespacer) + resources, err := manifests.setEffectiveNamespaces(resources0) if err != nil { t.Fatal(err) } diff --git a/cmd/fluxd/main.go b/cmd/fluxd/main.go index 2a69572de..6858897ce 100644 --- a/cmd/fluxd/main.go +++ b/cmd/fluxd/main.go @@ -271,7 +271,7 @@ func main() { var clusterVersion string var sshKeyRing ssh.KeyRing var k8s cluster.Cluster - var k8sManifests *kubernetes.Manifests + var k8sManifests cluster.Manifests var imageCreds func() registry.ImageCreds { restClientConfig, err := rest.InClusterConfig() @@ -369,13 +369,12 @@ func main() { imageCreds = k8sInst.ImagesToFetch // There is only one way we currently interpret a repo of // files as manifests, and that's as Kubernetes yamels. - k8sManifests = &kubernetes.Manifests{} - k8sManifests.Namespacer, err = kubernetes.NewNamespacer(discoClientset) - + namespacer, err := kubernetes.NewNamespacer(discoClientset) if err != nil { logger.Log("err", err) os.Exit(1) } + k8sManifests = kubernetes.NewManifests(namespacer, logger) } // Wrap the procedure for collecting images to scan diff --git a/daemon/daemon_test.go b/daemon/daemon_test.go index 40fde1342..5c1e1bfac 100644 --- a/daemon/daemon_test.go +++ b/daemon/daemon_test.go @@ -731,12 +731,14 @@ func mockDaemon(t *testing.T) (*Daemon, func(), func(), *cluster.Mock, *mockEven // Jobs queue (starts itself) jobs := job.NewQueue(jshutdown, jwg) + manifests := kubernetes.NewManifests(alwaysDefault, log.NewLogfmtLogger(os.Stdout)) + // Finally, the daemon d := &Daemon{ Repo: repo, GitConfig: params, Cluster: k8s, - Manifests: &kubernetes.Manifests{Namespacer: alwaysDefault}, + Manifests: manifests, Registry: imageRegistry, V: testVersion, Jobs: jobs, diff --git a/daemon/loop_test.go b/daemon/loop_test.go index 4f11d0316..0a7db386f 100644 --- a/daemon/loop_test.go +++ b/daemon/loop_test.go @@ -60,10 +60,12 @@ func daemon(t *testing.T) (*Daemon, func()) { UserEmail: gitEmail, } + manifests := kubernetes.NewManifests(alwaysDefault, log.NewLogfmtLogger(os.Stdout)) + jobs := job.NewQueue(shutdown, wg) d := &Daemon{ Cluster: k8s, - Manifests: &kubernetes.Manifests{Namespacer: alwaysDefault}, + Manifests: manifests, Registry: ®istryMock.Registry{}, Repo: repo, GitConfig: gitConfig, diff --git a/release/releaser_test.go b/release/releaser_test.go index f90f5aae3..712686ab5 100644 --- a/release/releaser_test.go +++ b/release/releaser_test.go @@ -3,6 +3,7 @@ package release import ( "errors" "fmt" + "os" "reflect" "testing" "time" @@ -13,7 +14,6 @@ import ( "github.com/weaveworks/flux" "github.com/weaveworks/flux/cluster" "github.com/weaveworks/flux/cluster/kubernetes" - kresource "github.com/weaveworks/flux/cluster/kubernetes/resource" "github.com/weaveworks/flux/git" "github.com/weaveworks/flux/git/gittest" "github.com/weaveworks/flux/image" @@ -22,12 +22,6 @@ import ( "github.com/weaveworks/flux/update" ) -type constNamespacer string - -func (ns constNamespacer) EffectiveNamespace(manifest kresource.KubeManifest, _ kubernetes.ResourceScopes) (string, error) { - return string(ns), nil -} - var ( // This must match the value in cluster/kubernetes/testfiles/data.go helloContainer = "greeter" @@ -142,7 +136,7 @@ var ( }, }, } - mockManifests = &kubernetes.Manifests{Namespacer: constNamespacer("default")} + mockManifests = kubernetes.NewManifests(kubernetes.ConstNamespacer("default"), log.NewLogfmtLogger(os.Stdout)) ) func mockCluster(running ...cluster.Workload) *cluster.Mock { @@ -1055,9 +1049,9 @@ func testRelease(t *testing.T, ctx *ReleaseContext, spec update.ReleaseImageSpec // --- test verification -// A Manifests implementation that does updates incorrectly, so they should fail verification. +// A manifests implementation that does updates incorrectly, so they should fail verification. type badManifests struct { - kubernetes.Manifests + cluster.Manifests } func (m *badManifests) UpdateImage(def []byte, resourceID flux.ResourceID, container string, newImageID image.Ref) ([]byte, error) { @@ -1075,15 +1069,16 @@ func Test_BadRelease(t *testing.T) { checkout1, cleanup1 := setup(t) defer cleanup1() + manifests := kubernetes.NewManifests(kubernetes.ConstNamespacer("default"), log.NewLogfmtLogger(os.Stdout)) ctx := &ReleaseContext{ cluster: cluster, - manifests: &kubernetes.Manifests{}, + manifests: manifests, repo: checkout1, registry: mockRegistry, } _, err := Release(ctx, spec, log.NewNopLogger()) if err != nil { - t.Fatal("release with 'good' Manifests should succeed, but errored:", err) + t.Fatal("release with 'good' manifests should succeed, but errored:", err) } checkout2, cleanup2 := setup(t) @@ -1091,7 +1086,7 @@ func Test_BadRelease(t *testing.T) { ctx = &ReleaseContext{ cluster: cluster, - manifests: &badManifests{Manifests: kubernetes.Manifests{constNamespacer("default")}}, + manifests: &badManifests{manifests}, repo: checkout2, registry: mockRegistry, } diff --git a/sync/sync_test.go b/sync/sync_test.go index 011bbc3d5..41da55a8b 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -1,8 +1,10 @@ package sync import ( + "os" "testing" + "github.com/go-kit/kit/log" "github.com/stretchr/testify/assert" "github.com/weaveworks/flux/cluster" @@ -19,7 +21,7 @@ func TestSync(t *testing.T) { defer cleanup() // Start with nothing running. We should be told to apply all the things. - manifests := &kubernetes.Manifests{} + manifests := kubernetes.NewManifests(kubernetes.ConstNamespacer("default"), log.NewLogfmtLogger(os.Stdout)) clus := &syncCluster{map[string]string{}} dirs := checkout.ManifestDirs()