Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
feat: scan cluster stack and compare hash
Browse files Browse the repository at this point in the history
  • Loading branch information
Timer committed Oct 11, 2018
1 parent 20cc90b commit 8acf3da
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 6 deletions.
1 change: 1 addition & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Cluster interface {
SomeControllers([]flux.ResourceID) ([]Controller, error)
Ping() error
Export() ([]byte, error)
ExportByLabel(string, string) ([]byte, error)
Sync(SyncDef, map[string]policy.Update, map[string]policy.Update) error
PublicSSHKey(regenerate bool) (ssh.PublicKey, error)
}
Expand Down
79 changes: 79 additions & 0 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubernetes
import (
"bytes"
"fmt"
"strings"
"sync"

k8syaml "github.com/ghodss/yaml"
Expand All @@ -16,6 +17,8 @@ import (
apiv1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
k8sclientdynamic "k8s.io/client-go/dynamic"
k8sclient "k8s.io/client-go/kubernetes"

"github.com/weaveworks/flux"
Expand All @@ -33,10 +36,12 @@ const (
)

type coreClient k8sclient.Interface
type dynamicClient k8sclientdynamic.Interface
type fluxHelmClient fhrclient.Interface

type extendedClient struct {
coreClient
dynamicClient
fluxHelmClient
}

Expand Down Expand Up @@ -116,6 +121,7 @@ type Cluster struct {

// NewCluster returns a usable cluster.
func NewCluster(clientset k8sclient.Interface,
dynamicClientset k8sclientdynamic.Interface,
fluxHelmClientset fhrclient.Interface,
applier Applier,
sshKeyRing ssh.KeyRing,
Expand All @@ -125,6 +131,7 @@ func NewCluster(clientset k8sclient.Interface,
c := &Cluster{
client: extendedClient{
clientset,
dynamicClientset,
fluxHelmClientset,
},
applier: applier,
Expand Down Expand Up @@ -335,6 +342,78 @@ func (c *Cluster) Export() ([]byte, error) {
return config.Bytes(), nil
}

func contains(a []string, x string) bool {
for _, n := range a {
if x == n {
return true
}
}
return false
}

func (c *Cluster) ExportByLabel(labelName string, labelValue string) ([]byte, error) {
var config bytes.Buffer

resources, err := c.client.coreClient.Discovery().ServerResources()
if err != nil {
return nil, err
}
for _, resource := range resources {
for _, apiResource := range resource.APIResources {
verbs := apiResource.Verbs
// skip resources that can't be listed
if !contains(verbs, "list") {
continue
}

// get group and version
var group, version string
groupVersion := resource.GroupVersion
if strings.Contains(groupVersion, "/") {
a := strings.SplitN(groupVersion, "/", 2)
group = a[0]
version = a[1]
} else {
group = ""
version = groupVersion
}

resourceClient := c.client.dynamicClient.Resource(schema.GroupVersionResource{
Group: group,
Version: version,
Resource: apiResource.Name,
})
data, err := resourceClient.List(meta_v1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", labelName, labelValue),
})
if err != nil {
return nil, err
}

for _, item := range data.Items {
apiVersion := item.GetAPIVersion()
kind := item.GetKind()

itemDesc := fmt.Sprintf("%s:%s", apiVersion, kind)
// https://github.com/kontena/k8s-client/blob/6e9a7ba1f03c255bd6f06e8724a1c7286b22e60f/lib/k8s/stack.rb#L17-L22
if itemDesc == "v1:ComponentStatus" || itemDesc == "v1:Endpoints" {
continue
}

yamlBytes, err := k8syaml.Marshal(item.Object)
if err != nil {
return nil, err
}
config.WriteString("---\n")
config.Write(yamlBytes)
config.WriteString("\n")
}
}
}

return config.Bytes(), nil
}

// kind & apiVersion must be passed separately as the object's TypeMeta is not populated
func appendYAML(buffer *bytes.Buffer, apiVersion, kind string, object interface{}) error {
yamlBytes, err := k8syaml.Marshal(object)
Expand Down
8 changes: 7 additions & 1 deletion cmd/fluxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/pflag"
k8sifclient "github.com/weaveworks/flux/integrations/client/clientset/versioned"
k8sclientdynamic "k8s.io/client-go/dynamic"
k8sclient "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

Expand Down Expand Up @@ -199,6 +200,11 @@ func main() {
logger.Log("err", err)
os.Exit(1)
}
dynamicClientset, err := k8sclientdynamic.NewForConfig(restClientConfig)
if err != nil {
logger.Log("err", err)
os.Exit(1)
}

ifclientset, err := k8sifclient.NewForConfig(restClientConfig)
if err != nil {
Expand Down Expand Up @@ -253,7 +259,7 @@ func main() {
logger.Log("kubectl", kubectl)

kubectlApplier := kubernetes.NewKubectl(kubectl, restClientConfig)
k8sInst := kubernetes.NewCluster(clientset, ifclientset, kubectlApplier, sshKeyRing, logger, *k8sNamespaceWhitelist)
k8sInst := kubernetes.NewCluster(clientset, dynamicClientset, ifclientset, kubectlApplier, sshKeyRing, logger, *k8sNamespaceWhitelist)

if err := k8sInst.Ping(); err != nil {
logger.Log("ping", err)
Expand Down
1 change: 1 addition & 0 deletions daemon/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) {
// TODO supply deletes argument from somewhere (command-line?)
// TODO: supply tracking argument from somewhere
if err := fluxsync.Sync(d.Manifests, allResources, d.Cluster, true, false, logger); err != nil {
fmt.Printf("err %v %s", err, err)
logger.Log("err", err)
switch syncerr := err.(type) {
case cluster.SyncError:
Expand Down
40 changes: 35 additions & 5 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package sync
import (
"crypto/sha1"
"encoding/hex"
"fmt"
"sort"

"github.com/go-kit/kit/log"
"github.com/pkg/errors"

"github.com/weaveworks/flux/cluster"
kresource "github.com/weaveworks/flux/cluster/kubernetes/resource"
"github.com/weaveworks/flux/policy"
"github.com/weaveworks/flux/resource"
)
Expand Down Expand Up @@ -48,11 +50,12 @@ func Sync(m cluster.Manifests, repoResources map[string]resource.Resource, clus
// no-op.
sync := cluster.SyncDef{}

var stackName, stackChecksum string
resourceLabels := map[string]policy.Update{}
resourcePolicyUpdates := map[string]policy.Update{}
if tracks {
stackName := "default" // TODO: multiple stack support
stackChecksum := getStackChecksum(repoResources)
stackName = "default" // TODO: multiple stack support
stackChecksum = getStackChecksum(repoResources)

logger.Log("stack", stackName, "checksum", stackChecksum)

Expand All @@ -65,8 +68,6 @@ func Sync(m cluster.Manifests, repoResources map[string]resource.Resource, clus
Add: policy.Set{policy.StackChecksum: stackChecksum},
}
}

// label flux.weave.works/stack
}

// DANGER ZONE (tamara) This works and is dangerous. At the moment will delete Flux and
Expand All @@ -82,7 +83,36 @@ func Sync(m cluster.Manifests, repoResources map[string]resource.Resource, clus
prepareSyncApply(logger, clusterResources, id, res, &sync)
}

return clus.Sync(sync, resourceLabels, resourcePolicyUpdates)
if err := clus.Sync(sync, resourceLabels, resourcePolicyUpdates); err != nil {
return err
}

if tracks {
logger.Log("scanning cluster stack for orphaned resources", stackName)
clusterResourceBytes, err := clus.ExportByLabel(fmt.Sprintf("%s%s", kresource.PolicyPrefix, "stack"), stackName)
if err != nil {
return errors.Wrap(err, "exporting resource defs from cluster post-sync")
}
clusterResources, err = m.ParseManifests(clusterResourceBytes)
if err != nil {
return errors.Wrap(err, "parsing exported resources post-sync")
}

for resourceID, res := range clusterResources {
if res.Policy().Has(policy.StackChecksum) {
val, _ := res.Policy().Get(policy.StackChecksum)
if val != stackChecksum {
logger.Log("cluster resource", resourceID, "invalid checksum", val)
} else {
logger.Log("cluster resource ok", resourceID)
}
} else {
logger.Log("cluster resource", resourceID, "missing policy", policy.StackChecksum)
}
}
}

return nil
}

func prepareSyncDelete(logger log.Logger, repoResources map[string]resource.Resource, id string, res resource.Resource, sync *cluster.SyncDef) {
Expand Down

0 comments on commit 8acf3da

Please sign in to comment.