Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1943 from 2opremio/1941-fix-CRD-syncing
Browse files Browse the repository at this point in the history
Exclude resources if we cannot find their scope
  • Loading branch information
Alfonso Acosta authored Apr 16, 2019
2 parents 7563152 + 6f2eba2 commit fec2e9f
Show file tree
Hide file tree
Showing 13 changed files with 163 additions and 56 deletions.
2 changes: 1 addition & 1 deletion cluster/kubernetes/doc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
Package kubernetes provides implementations of `Cluster` and
`Manifests` that interact with the Kubernetes API (using kubectl or
`manifests` that interact with the Kubernetes API (using kubectl or
the k8s API client).
*/
package kubernetes
55 changes: 43 additions & 12 deletions cluster/kubernetes/manifests.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package kubernetes

import (
"fmt"
"strings"

"github.com/go-kit/kit/log"
"gopkg.in/yaml.v2"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -24,13 +28,23 @@ type namespacer interface {
EffectiveNamespace(manifest kresource.KubeManifest, knownScopes ResourceScopes) (string, error)
}

// Manifests is an implementation of cluster.Manifests, particular to
// manifests is an implementation of cluster.Manifests, particular to
// Kubernetes. Aside from loading manifests from files, it does some
// "post-processsing" to make sure the view of the manifests is what
// would be applied; in particular, it fills in the namespace of
// manifests that would be given a default namespace when applied.
type Manifests struct {
Namespacer namespacer
type manifests struct {
namespacer namespacer
logger log.Logger
resourceWarnings map[string]struct{}
}

func NewManifests(ns namespacer, logger log.Logger) *manifests {
return &manifests{
namespacer: ns,
logger: logger,
resourceWarnings: map[string]struct{}{},
}
}

func getCRDScopes(manifests map[string]kresource.KubeManifest) ResourceScopes {
Expand Down Expand Up @@ -60,31 +74,48 @@ func getCRDScopes(manifests map[string]kresource.KubeManifest) ResourceScopes {
return result
}

func setEffectiveNamespaces(manifests map[string]kresource.KubeManifest, nser namespacer) (map[string]resource.Resource, error) {
func (m *manifests) setEffectiveNamespaces(manifests map[string]kresource.KubeManifest) (map[string]resource.Resource, error) {
knownScopes := getCRDScopes(manifests)
result := map[string]resource.Resource{}
for _, km := range manifests {
if nser != nil {
ns, err := nser.EffectiveNamespace(km, knownScopes)
if err != nil {
return nil, err
resID := km.ResourceID()
resIDStr := resID.String()
ns, err := m.namespacer.EffectiveNamespace(km, knownScopes)
if err != nil {
if strings.Contains(err.Error(), "not found") {
// discard the resource and keep going after making sure we logged about it
if _, warningLogged := m.resourceWarnings[resIDStr]; !warningLogged {
_, kind, name := resID.Components()
partialResIDStr := kind + "/" + name
m.logger.Log(
"warn", fmt.Sprintf("cannot find scope of resource %s: %s", partialResIDStr, err),
"impact", fmt.Sprintf("resource %s will be excluded until its scope is available", partialResIDStr))
m.resourceWarnings[resIDStr] = struct{}{}
}
continue
}
km.SetNamespace(ns)
return nil, err
}
km.SetNamespace(ns)
if _, warningLogged := m.resourceWarnings[resIDStr]; warningLogged {
// indicate that we found the resource's scope and allow logging a warning again
m.logger.Log("info", fmt.Sprintf("found scope of resource %s, back in bussiness!", km.ResourceID().String()))
delete(m.resourceWarnings, resIDStr)
}
result[km.ResourceID().String()] = km
}
return result, nil
}

func (m *Manifests) LoadManifests(base string, paths []string) (map[string]resource.Resource, error) {
func (m *manifests) LoadManifests(base string, paths []string) (map[string]resource.Resource, error) {
manifests, err := kresource.Load(base, paths)
if err != nil {
return nil, err
}
return setEffectiveNamespaces(manifests, m.Namespacer)
return m.setEffectiveNamespaces(manifests)
}

func (m *Manifests) UpdateImage(def []byte, id flux.ResourceID, container string, image image.Ref) ([]byte, error) {
func (m *manifests) UpdateImage(def []byte, id flux.ResourceID, container string, image image.Ref) ([]byte, error) {
return updateWorkload(def, id, container, image)
}

Expand Down
88 changes: 78 additions & 10 deletions cluster/kubernetes/manifests_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
package kubernetes

import (
"bytes"
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/go-kit/kit/log"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/weaveworks/flux/cluster/kubernetes/testfiles"
)

func TestKnownCRDScope(t *testing.T) {
func TestLocalCRDScope(t *testing.T) {
coreClient := makeFakeClient()

nser, err := NewNamespacer(coreClient.Discovery())
if err != nil {
t.Fatal(err)
}
manifests := Manifests{nser}
assert.NoError(t, err)
manifests := NewManifests(nser, log.NewLogfmtLogger(os.Stdout))

dir, cleanup := testfiles.TempDir(t)
defer cleanup()
Expand Down Expand Up @@ -46,17 +50,81 @@ metadata:
namespace: bar
`

if err = ioutil.WriteFile(filepath.Join(dir, "test.yaml"), []byte(defs), 0600); err != nil {
t.Fatal(err)
}
err = ioutil.WriteFile(filepath.Join(dir, "test.yaml"), []byte(defs), 0600)
assert.NoError(t, err)

resources, err := manifests.LoadManifests(dir, []string{dir})
if err != nil {
t.Fatal(err)
}

if _, ok := resources["bar:foo/fooinstance"]; !ok {
t.Fatal("couldn't find crd instance")
assert.Contains(t, resources, "bar:foo/fooinstance")
}

func TestUnKnownCRDScope(t *testing.T) {
coreClient := makeFakeClient()

nser, err := NewNamespacer(coreClient.Discovery())
assert.NoError(t, err)
logBuffer := bytes.NewBuffer(nil)
manifests := NewManifests(nser, log.NewLogfmtLogger(logBuffer))

dir, cleanup := testfiles.TempDir(t)
defer cleanup()
const defs = `---
apiVersion: v1
kind: Namespace
metadata:
name: mynamespace
---
apiVersion: foo.example.com/v1beta1
kind: Foo
metadata:
name: fooinstance
namespace: bar
`

err = ioutil.WriteFile(filepath.Join(dir, "test.yaml"), []byte(defs), 0600)
assert.NoError(t, err)

resources, err := manifests.LoadManifests(dir, []string{dir})
assert.NoError(t, err)

// can't contain the CRD since we cannot figure out its scope
assert.NotContains(t, resources, "bar:foo/fooinstance")

// however, it should contain the namespace
assert.Contains(t, resources, "<cluster>:namespace/mynamespace")

savedLog := logBuffer.String()
// and we should had logged a warning about it
assert.Contains(t, savedLog, "cannot find scope of resource foo/fooinstance")

// loading again shouldn't result in more warnings
resources, err = manifests.LoadManifests(dir, []string{dir})
assert.NoError(t, err)
assert.Equal(t, logBuffer.String(), savedLog)

// But getting the scope of the CRD should result in a log saying we found the scope
apiResourcesWithoutFoo := coreClient.Resources
apiResource := &metav1.APIResourceList{
GroupVersion: "foo.example.com/v1beta1",
APIResources: []metav1.APIResource{
{Name: "foos", SingularName: "foo", Namespaced: true, Kind: "Foo"},
},
}
coreClient.Resources = append(coreClient.Resources, apiResource)

logBuffer.Reset()
resources, err = manifests.LoadManifests(dir, []string{dir})
assert.NoError(t, err)
assert.Len(t, resources, 2)
assert.Contains(t, logBuffer.String(), "found scope of resource bar:foo/fooinstance")

// and missing the scope information again should result in another warning
coreClient.Resources = apiResourcesWithoutFoo
logBuffer.Reset()
resources, err = manifests.LoadManifests(dir, []string{dir})
assert.NoError(t, err)
assert.Contains(t, savedLog, "cannot find scope of resource foo/fooinstance")
}
9 changes: 9 additions & 0 deletions cluster/kubernetes/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package kubernetes

import kresource "github.com/weaveworks/flux/cluster/kubernetes/resource"

type ConstNamespacer string

func (ns ConstNamespacer) EffectiveNamespace(manifest kresource.KubeManifest, _ ResourceScopes) (string, error) {
return string(ns), nil
}
1 change: 1 addition & 0 deletions cluster/kubernetes/namespacer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,6 @@ func (n *namespaceViaDiscovery) lookupNamespacedInCluster(groupVersion, kind str
return resource.Namespaced, nil
}
}

return false, fmt.Errorf("resource not found for API %s, kind %s", groupVersion, kind)
}
6 changes: 3 additions & 3 deletions cluster/kubernetes/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/weaveworks/flux/resource"
)

func (m *Manifests) UpdatePolicies(def []byte, id flux.ResourceID, update policy.Update) ([]byte, error) {
func (m *manifests) UpdatePolicies(def []byte, id flux.ResourceID, update policy.Update) ([]byte, error) {
ns, kind, name := id.Components()
add, del := update.Add, update.Remove

Expand Down Expand Up @@ -48,7 +48,7 @@ func (m *Manifests) UpdatePolicies(def []byte, id flux.ResourceID, update policy
return (KubeYAML{}).Annotate(def, ns, kind, name, args...)
}

func (m *Manifests) extractWorkloadContainers(def []byte, id flux.ResourceID) ([]resource.Container, error) {
func (m *manifests) extractWorkloadContainers(def []byte, id flux.ResourceID) ([]resource.Container, error) {
kresources, err := kresource.ParseMultidoc(def, "stdin")
if err != nil {
return nil, err
Expand All @@ -58,7 +58,7 @@ func (m *Manifests) extractWorkloadContainers(def []byte, id flux.ResourceID) ([
// We could get out of our way to fix this (or give a better error) but:
// 1. With the exception of HelmReleases CRD instances are not workloads anyways.
// 2. The problem is eventually fixed by the first successful sync.
resources, err := setEffectiveNamespaces(kresources, m.Namespacer)
resources, err := m.setEffectiveNamespaces(kresources)
if err != nil {
return nil, err
}
Expand Down
14 changes: 5 additions & 9 deletions cluster/kubernetes/policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,17 @@ package kubernetes

import (
"bytes"
"os"
"testing"
"text/template"

"github.com/go-kit/kit/log"
"github.com/stretchr/testify/assert"

"github.com/weaveworks/flux"
kresource "github.com/weaveworks/flux/cluster/kubernetes/resource"
"github.com/weaveworks/flux/policy"
)

type constNamespacer string

func (ns constNamespacer) EffectiveNamespace(manifest kresource.KubeManifest, _ ResourceScopes) (string, error) {
return string(ns), nil
}

func TestUpdatePolicies(t *testing.T) {
for _, c := range []struct {
name string
Expand Down Expand Up @@ -186,7 +181,8 @@ func TestUpdatePolicies(t *testing.T) {
caseIn := templToString(t, annotationsTemplate, c.in)
caseOut := templToString(t, annotationsTemplate, c.out)
resourceID := flux.MustParseResourceID("default:deployment/nginx")
out, err := (&Manifests{constNamespacer("default")}).UpdatePolicies([]byte(caseIn), resourceID, c.update)
manifests := NewManifests(ConstNamespacer("default"), log.NewLogfmtLogger(os.Stdout))
out, err := manifests.UpdatePolicies([]byte(caseIn), resourceID, c.update)
assert.Equal(t, c.wantErr, err != nil, "unexpected error value: %s", err)
if !c.wantErr {
assert.Equal(t, string(out), caseOut)
Expand All @@ -200,7 +196,7 @@ func TestUpdatePolicies_invalidTagPattern(t *testing.T) {
update := policy.Update{
Add: policy.Set{policy.TagPrefix("nginx"): "semver:invalid"},
}
_, err := (&Manifests{}).UpdatePolicies(nil, resourceID, update)
_, err := (&manifests{}).UpdatePolicies(nil, resourceID, update)
assert.Error(t, err)
}

Expand Down
4 changes: 3 additions & 1 deletion cluster/kubernetes/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"fmt"
"os"
"sort"
"strings"
"testing"
Expand Down Expand Up @@ -313,14 +314,15 @@ metadata:
if err != nil {
t.Fatal(err)
}
manifests := NewManifests(namespacer, log.NewLogfmtLogger(os.Stdout))

resources0, err := kresource.ParseMultidoc([]byte(defs), "before")
if err != nil {
t.Fatal(err)
}

// Needed to get from KubeManifest to resource.Resource
resources, err := setEffectiveNamespaces(resources0, namespacer)
resources, err := manifests.setEffectiveNamespaces(resources0)
if err != nil {
t.Fatal(err)
}
Expand Down
7 changes: 3 additions & 4 deletions cmd/fluxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func main() {
var clusterVersion string
var sshKeyRing ssh.KeyRing
var k8s cluster.Cluster
var k8sManifests *kubernetes.Manifests
var k8sManifests cluster.Manifests
var imageCreds func() registry.ImageCreds
{
restClientConfig, err := rest.InClusterConfig()
Expand Down Expand Up @@ -373,13 +373,12 @@ func main() {
imageCreds = k8sInst.ImagesToFetch
// There is only one way we currently interpret a repo of
// files as manifests, and that's as Kubernetes yamels.
k8sManifests = &kubernetes.Manifests{}
k8sManifests.Namespacer, err = kubernetes.NewNamespacer(discoClientset)

namespacer, err := kubernetes.NewNamespacer(discoClientset)
if err != nil {
logger.Log("err", err)
os.Exit(1)
}
k8sManifests = kubernetes.NewManifests(namespacer, logger)
}

// Wrap the procedure for collecting images to scan
Expand Down
4 changes: 3 additions & 1 deletion daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,12 +731,14 @@ func mockDaemon(t *testing.T) (*Daemon, func(), func(), *cluster.Mock, *mockEven
// Jobs queue (starts itself)
jobs := job.NewQueue(jshutdown, jwg)

manifests := kubernetes.NewManifests(alwaysDefault, log.NewLogfmtLogger(os.Stdout))

// Finally, the daemon
d := &Daemon{
Repo: repo,
GitConfig: params,
Cluster: k8s,
Manifests: &kubernetes.Manifests{Namespacer: alwaysDefault},
Manifests: manifests,
Registry: imageRegistry,
V: testVersion,
Jobs: jobs,
Expand Down
4 changes: 3 additions & 1 deletion daemon/loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@ func daemon(t *testing.T) (*Daemon, func()) {
UserEmail: gitEmail,
}

manifests := kubernetes.NewManifests(alwaysDefault, log.NewLogfmtLogger(os.Stdout))

jobs := job.NewQueue(shutdown, wg)
d := &Daemon{
Cluster: k8s,
Manifests: &kubernetes.Manifests{Namespacer: alwaysDefault},
Manifests: manifests,
Registry: &registryMock.Registry{},
Repo: repo,
GitConfig: gitConfig,
Expand Down
Loading

0 comments on commit fec2e9f

Please sign in to comment.