Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Implement Generators and Updaters
Browse files Browse the repository at this point in the history
* Implement ResourceStore to abstract out manifest operations. ResourceStore
  is meant to abstract out file operations on explicit manifest
  files, paving the way for supporting programatically-generated cluster
  resources.

* Implement flux configfile parsing and execution
  • Loading branch information
2opremio committed Apr 3, 2019
1 parent ba3e794 commit 3fc3952
Show file tree
Hide file tree
Showing 22 changed files with 886 additions and 251 deletions.
31 changes: 24 additions & 7 deletions cluster/kubernetes/manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
kresource "github.com/weaveworks/flux/cluster/kubernetes/resource"
"github.com/weaveworks/flux/image"
"github.com/weaveworks/flux/resource"
Expand Down Expand Up @@ -33,6 +34,8 @@ type Manifests struct {
Namespacer namespacer
}

var _ cluster.Manifests = &Manifests{}

func getCRDScopes(manifests map[string]kresource.KubeManifest) ResourceScopes {
result := ResourceScopes{}
for _, km := range manifests {
Expand Down Expand Up @@ -60,9 +63,9 @@ func getCRDScopes(manifests map[string]kresource.KubeManifest) ResourceScopes {
return result
}

func postProcess(manifests map[string]kresource.KubeManifest, nser namespacer) (map[string]resource.Resource, error) {
func postProcess(manifests map[string]kresource.KubeManifest, nser namespacer) ([]resource.Resource, error) {
knownScopes := getCRDScopes(manifests)
result := map[string]resource.Resource{}
result := []resource.Resource{}
for _, km := range manifests {
if nser != nil {
ns, err := nser.EffectiveNamespace(km, knownScopes)
Expand All @@ -71,21 +74,35 @@ func postProcess(manifests map[string]kresource.KubeManifest, nser namespacer) (
}
km.SetNamespace(ns)
}
result[km.ResourceID().String()] = km
result = append(result, km)
}
return result, nil
}

func (c *Manifests) LoadManifests(base string, paths []string) (map[string]resource.Resource, error) {
manifests, err := kresource.Load(base, paths)
func (c *Manifests) LoadManifests(baseDir string, paths []string) ([]resource.Resource, error) {
// TODO(fons): is there a need for Load to return a map?
manifests, err := kresource.Load(baseDir, paths)
if err != nil {
return nil, err
}
return postProcess(manifests, c.Namespacer)
}

func (c *Manifests) UpdateImage(def []byte, id flux.ResourceID, container string, image image.Ref) ([]byte, error) {
func (c *Manifests) ParseManifest(def []byte, source string) ([]resource.Resource, error) {
// TODO(fons): is there a need for ParseMultidoc to return a map?
resources, err := kresource.ParseMultidoc(def, source)
if err != nil {
return nil, err
}
var result []resource.Resource
for _, r := range resources {
result = append(result, r)
}
return result, nil
}

func (c *Manifests) SetWorkloadContainerImage(def []byte, id flux.ResourceID, container string, image image.Ref) ([]byte, error) {
return updateWorkload(def, id, container, image)
}

// UpdatePolicies and ServicesWithPolicies in policies.go
// UpdateWorkloadPolicies in policies.go
12 changes: 9 additions & 3 deletions cluster/kubernetes/manifests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"

"github.com/weaveworks/flux/cluster/kubernetes/testfiles"
)

Expand Down Expand Up @@ -55,8 +57,12 @@ metadata:
t.Fatal(err)
}

if _, ok := resources["bar:foo/fooinstance"]; !ok {
t.Fatal("couldn't find crd instance")
found := false
for _, r := range resources {
if r.ResourceID().String() == "bar:foo/fooinstance" {
found = true
break
}
}

assert.True(t, found)
}
56 changes: 22 additions & 34 deletions cluster/kubernetes/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,22 @@ import (
"fmt"

"github.com/pkg/errors"
"gopkg.in/yaml.v2"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
kresource "github.com/weaveworks/flux/cluster/kubernetes/resource"
"github.com/weaveworks/flux/policy"
"github.com/weaveworks/flux/resource"
)

func (m *Manifests) UpdatePolicies(def []byte, id flux.ResourceID, update policy.Update) ([]byte, error) {
ns, kind, name := id.Components()
func (m *Manifests) GetAnnotationChangesForPolicyUpdate(workload resource.Workload, update policy.Update) ([]cluster.AnnotationChange, error) {
add, del := update.Add, update.Remove

// We may be sent the pseudo-policy `policy.TagAll`, which means
// apply this filter to all containers. To do so, we need to know
// what all the containers are.
if tagAll, ok := update.Add.Get(policy.TagAll); ok {
add = add.Without(policy.TagAll)
containers, err := extractContainers(def, id)
if err != nil {
return nil, err
}

for _, container := range containers {
for _, container := range workload.Containers() {
if tagAll == policy.PatternAll.String() {
del = del.Add(policy.TagPrefix(container.Name))
} else {
Expand All @@ -35,38 +28,20 @@ func (m *Manifests) UpdatePolicies(def []byte, id flux.ResourceID, update policy
}
}

var args []string
result := []cluster.AnnotationChange{}
for pol, val := range add {
if policy.Tag(pol) && !policy.NewPattern(val).Valid() {
return nil, fmt.Errorf("invalid tag pattern: %q", val)
}
args = append(args, fmt.Sprintf("%s%s=%s", kresource.PolicyPrefix, pol, val))
result = append(result, cluster.AnnotationChange{kresource.PolicyPrefix + string(pol), &val})
}
for pol, _ := range del {
args = append(args, fmt.Sprintf("%s%s=", kresource.PolicyPrefix, pol))
result = append(result, cluster.AnnotationChange{kresource.PolicyPrefix + string(pol), nil})
}

return (KubeYAML{}).Annotate(def, ns, kind, name, args...)
}

type manifest struct {
Metadata struct {
Annotations map[string]string `yaml:"annotations"`
} `yaml:"metadata"`
return result, nil
}

func extractAnnotations(def []byte) (map[string]string, error) {
var m manifest
if err := yaml.Unmarshal(def, &m); err != nil {
return nil, errors.Wrap(err, "decoding manifest for annotations")
}
if m.Metadata.Annotations == nil {
return map[string]string{}, nil
}
return m.Metadata.Annotations, nil
}

func extractContainers(def []byte, id flux.ResourceID) ([]resource.Container, error) {
func (m *Manifests) UpdateWorkloadPolicies(def []byte, id flux.ResourceID, update policy.Update) ([]byte, error) {
resources, err := kresource.ParseMultidoc(def, "stdin")
if err != nil {
return nil, err
Expand All @@ -79,5 +54,18 @@ func extractContainers(def []byte, id flux.ResourceID) ([]resource.Container, er
if !ok {
return nil, errors.New("resource " + id.String() + " does not have containers")
}
return workload.Containers(), nil
if err != nil {
return nil, err
}
changes, err := m.GetAnnotationChangesForPolicyUpdate(workload, update)
var args []string
for _, change := range changes {
value := ""
if change.AnnotationValue != nil {
value = *change.AnnotationValue
}
args = append(args, fmt.Sprintf("%s=%s", change.AnnotationKey, value))
}
ns, kind, name := id.Components()
return (KubeYAML{}).Annotate(def, ns, kind, name, args...)
}
5 changes: 3 additions & 2 deletions cluster/kubernetes/policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestUpdatePolicies(t *testing.T) {
caseIn := templToString(t, annotationsTemplate, c.in)
caseOut := templToString(t, annotationsTemplate, c.out)
resourceID := flux.MustParseResourceID("default:deployment/nginx")
out, err := (&Manifests{}).UpdatePolicies([]byte(caseIn), resourceID, c.update)
out, err := (&Manifests{}).UpdateWorkloadPolicies([]byte(caseIn), resourceID, c.update)
assert.Equal(t, c.wantErr, err != nil)
if !c.wantErr {
assert.Equal(t, string(out), caseOut)
Expand All @@ -185,14 +185,15 @@ func TestUpdatePolicies_invalidTagPattern(t *testing.T) {
update := policy.Update{
Add: policy.Set{policy.TagPrefix("nginx"): "semver:invalid"},
}
_, err := (&Manifests{}).UpdatePolicies(nil, resourceID, update)
_, err := (&Manifests{}).UpdateWorkloadPolicies(nil, resourceID, update)
assert.Error(t, err)
}

var annotationsTemplate = template.Must(template.New("").Parse(`---
apiVersion: extensions/v1beta1
kind: Deployment
metadata: # comment really close to the war zone
namespace: default
name: nginx{{with .}}
annotations:{{range .}}
{{index . 0}}: {{printf "%s" (index . 1)}}{{end}}{{end}}
Expand Down
8 changes: 6 additions & 2 deletions cluster/kubernetes/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/weaveworks/flux/cluster"
kresource "github.com/weaveworks/flux/cluster/kubernetes/resource"
fluxfake "github.com/weaveworks/flux/integrations/client/clientset/versioned/fake"
"github.com/weaveworks/flux/resource"
"github.com/weaveworks/flux/sync"
)

Expand Down Expand Up @@ -323,8 +324,11 @@ metadata:
if err != nil {
t.Fatal(err)
}

err = sync.Sync("testset", resources, kube)
resourcesByID := map[string]resource.Resource{}
for _, r := range resources {
resourcesByID[r.ResourceID().String()] = r
}
err = sync.Sync("testset", resourcesByID, kube)
if !expectErrors && err != nil {
t.Error(err)
}
Expand Down
66 changes: 17 additions & 49 deletions cluster/manifests.go
Original file line number Diff line number Diff line change
@@ -1,68 +1,36 @@
package cluster

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/image"
"github.com/weaveworks/flux/policy"
"github.com/weaveworks/flux/resource"
)

type ManifestError struct {
error
}

func ErrResourceNotFound(name string) error {
return ManifestError{fmt.Errorf("manifest for resource %s not found under manifests path", name)}
// An Annotation change indicates how an annotation should be changed
type AnnotationChange struct {
AnnotationKey string
// AnnotationValue is the value to set the anntoation to, nil indicates delete
AnnotationValue *string
}

// Manifests represents how a set of files are used as definitions of
// Manifests represents a set of files containing definitions of
// resources, e.g., in Kubernetes, YAML files describing Kubernetes
// resources.
type Manifests interface {
// Update the image in a manifest's bytes to that given
UpdateImage(def []byte, resourceID flux.ResourceID, container string, newImageID image.Ref) ([]byte, error)
// Load all the resource manifests under the paths
// given. `baseDir` is used to relativise the paths, which are
// supplied as absolute paths to directories or files; at least
// one path should be supplied, even if it is the same as `baseDir`.
LoadManifests(baseDir string, paths []string) (map[string]resource.Resource, error)
// UpdatePolicies modifies a manifest to apply the policy update specified
UpdatePolicies([]byte, flux.ResourceID, policy.Update) ([]byte, error)
}

// UpdateManifest looks for the manifest for the identified resource,
// reads its contents, applies f(contents), and writes the results
// back to the file.
func UpdateManifest(m Manifests, root string, paths []string, id flux.ResourceID, f func(manifest []byte) ([]byte, error)) error {
resources, err := m.LoadManifests(root, paths)
if err != nil {
return err
}

resource, ok := resources[id.String()]
if !ok {
return ErrResourceNotFound(id.String())
}

path := filepath.Join(root, resource.Source())
def, err := ioutil.ReadFile(path)
if err != nil {
return err
}

newDef, err := f(def)
if err != nil {
return err
}

fi, err := os.Stat(path)
if err != nil {
return err
}
return ioutil.WriteFile(path, newDef, fi.Mode())
LoadManifests(baseDir string, paths []string) ([]resource.Resource, error)
// ParseManifest parses the content of a manifest and its source location into resources
ParseManifest(def []byte, source string) ([]resource.Resource, error)
// Set the image of a container in a manifest's bytes to that given
SetWorkloadContainerImage(def []byte, resourceID flux.ResourceID, container string, newImageID image.Ref) ([]byte, error)
// UpdatWorkloadPolicies modifies a manifest to apply the policy update specified
UpdateWorkloadPolicies(def []byte, id flux.ResourceID, update policy.Update) ([]byte, error)
// TODO(fons): the following function (in conjunction with the one above) is horrible, maybe move
// GetAnnotationChangesForPolicyUpdate to its own interface and pass that to UpdateWorkloadPolicies?
// GetAnnotationChangesForPolicyUpdate translates a policy update into annotation updates
GetAnnotationChangesForPolicyUpdate(workload resource.Workload, update policy.Update) ([]AnnotationChange, error)
}
43 changes: 28 additions & 15 deletions cluster/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,23 @@ import (

// Doubles as a cluster.Cluster and cluster.Manifests implementation
type Mock struct {
AllWorkloadsFunc func(maybeNamespace string) ([]Workload, error)
SomeWorkloadsFunc func([]flux.ResourceID) ([]Workload, error)
IsAllowedResourceFunc func(flux.ResourceID) bool
PingFunc func() error
ExportFunc func() ([]byte, error)
SyncFunc func(SyncSet) error
PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error)
UpdateImageFunc func(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error)
LoadManifestsFunc func(base string, paths []string) (map[string]resource.Resource, error)
UpdatePoliciesFunc func([]byte, flux.ResourceID, policy.Update) ([]byte, error)
AllWorkloadsFunc func(maybeNamespace string) ([]Workload, error)
SomeWorkloadsFunc func([]flux.ResourceID) ([]Workload, error)
IsAllowedResourceFunc func(flux.ResourceID) bool
PingFunc func() error
ExportFunc func() ([]byte, error)
SyncFunc func(SyncSet) error
PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error)
SetWorkloadContainerImageFunc func(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error)
LoadManifestsFunc func(base string, paths []string) ([]resource.Resource, error)
ParseManifestFunc func(def []byte, source string) ([]resource.Resource, error)
UpdateWorkloadPoliciesFunc func([]byte, flux.ResourceID, policy.Update) ([]byte, error)
GetAnnotationChangesForPolicyUpdateFunc func(workload resource.Workload, update policy.Update) ([]AnnotationChange, error)
}

var _ Cluster = &Mock{}
var _ Manifests = &Mock{}

func (m *Mock) AllWorkloads(maybeNamespace string) ([]Workload, error) {
return m.AllWorkloadsFunc(maybeNamespace)
}
Expand Down Expand Up @@ -50,14 +55,22 @@ func (m *Mock) PublicSSHKey(regenerate bool) (ssh.PublicKey, error) {
return m.PublicSSHKeyFunc(regenerate)
}

func (m *Mock) UpdateImage(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error) {
return m.UpdateImageFunc(def, id, container, newImageID)
func (m *Mock) SetWorkloadContainerImage(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error) {
return m.SetWorkloadContainerImageFunc(def, id, container, newImageID)
}

func (m *Mock) LoadManifests(base string, paths []string) (map[string]resource.Resource, error) {
func (m *Mock) LoadManifests(base string, paths []string) ([]resource.Resource, error) {
return m.LoadManifestsFunc(base, paths)
}

func (m *Mock) UpdatePolicies(def []byte, id flux.ResourceID, p policy.Update) ([]byte, error) {
return m.UpdatePoliciesFunc(def, id, p)
func (m *Mock) ParseManifest(def []byte, source string) ([]resource.Resource, error) {
return m.ParseManifestFunc(def, source)
}

func (m *Mock) UpdateWorkloadPolicies(def []byte, id flux.ResourceID, p policy.Update) ([]byte, error) {
return m.UpdateWorkloadPoliciesFunc(def, id, p)
}

func (m *Mock) GetAnnotationChangesForPolicyUpdate(workload resource.Workload, update policy.Update) ([]AnnotationChange, error) {
return m.GetAnnotationChangesForPolicyUpdateFunc(workload, update)
}
Loading

0 comments on commit 3fc3952

Please sign in to comment.