diff --git a/cluster/kubernetes/kubernetes.go b/cluster/kubernetes/kubernetes.go index d7c3654e0..942694983 100644 --- a/cluster/kubernetes/kubernetes.go +++ b/cluster/kubernetes/kubernetes.go @@ -6,16 +6,21 @@ package kubernetes import ( "bytes" + "fmt" k8syaml "github.com/ghodss/yaml" "github.com/go-kit/kit/log" + "github.com/pkg/errors" "gopkg.in/yaml.v2" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/discovery" k8sclient "k8s.io/client-go/kubernetes" + v1beta1apps "k8s.io/client-go/kubernetes/typed/apps/v1beta1" v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1beta1extensions "k8s.io/client-go/kubernetes/typed/extensions/v1beta1" + "k8s.io/client-go/pkg/api" + apiv1 "k8s.io/client-go/pkg/api/v1" "github.com/weaveworks/flux" "github.com/weaveworks/flux/cluster" @@ -33,6 +38,7 @@ type extendedClient struct { discovery.DiscoveryInterface v1core.CoreV1Interface v1beta1extensions.ExtensionsV1beta1Interface + v1beta1apps.StatefulSetsGetter } type apiObject struct { @@ -107,7 +113,7 @@ func NewCluster(clientset k8sclient.Interface, logger log.Logger) (*Cluster, error) { c := &Cluster{ - client: extendedClient{clientset.Discovery(), clientset.Core(), clientset.Extensions()}, + client: extendedClient{clientset.Discovery(), clientset.Core(), clientset.Extensions(), clientset.AppsV1beta1()}, applier: applier, actionc: make(chan func()), logger: logger, @@ -138,11 +144,21 @@ func (c *Cluster) loop() { func (c *Cluster) SomeControllers(ids []flux.ResourceID) (res []cluster.Controller, err error) { var controllers []cluster.Controller for _, id := range ids { - controller, err := MakeController(c, id) + ns, kind, name := id.Components() + + resourceKind, ok := resourceKinds[kind] + if !ok { + return nil, fmt.Errorf("Unsupported kind %v", kind) + } + + podController, err := resourceKind.getPodController(c, ns, name) if err != nil { return nil, err } - controllers = append(controllers, *controller) + + if !isAddon(podController) { + controllers = append(controllers, podController.toClusterController(id)) + } } return controllers, nil } @@ -161,12 +177,19 @@ func (c *Cluster) AllControllers(namespace string) (res []cluster.Controller, er continue } - controllers, err := MakeAllControllers(c, ns.Name) - if err != nil { - return nil, err - } + for kind, resourceKind := range resourceKinds { + podControllers, err := resourceKind.getPodControllers(c, ns.Name) + if err != nil { + return nil, err + } - allControllers = append(allControllers, controllers...) + for _, podController := range podControllers { + if !isAddon(podController) { + id := flux.MakeResourceID(ns.Name, kind, podController.name) + allControllers = append(allControllers, podController.toClusterController(id)) + } + } + } } return allControllers, nil @@ -228,8 +251,19 @@ func (c *Cluster) Export() ([]byte, error) { return nil, errors.Wrap(err, "marshalling namespace to YAML") } - if err := AppendYAML(c, ns.Name, &config); err != nil { - return nil, err + for _, resourceKind := range resourceKinds { + podControllers, err := resourceKind.getPodControllers(c, ns.Name) + if err != nil { + return nil, err + } + + for _, pc := range podControllers { + if !isAddon(pc) { + if err := appendYAML(&config, pc.apiVersion, pc.kind, pc.apiObject); err != nil { + return nil, err + } + } + } } } return config.Bytes(), nil @@ -261,9 +295,92 @@ func (c *Cluster) PublicSSHKey(regenerate bool) (ssh.PublicKey, error) { return publicKey, nil } +func mergeCredentials(c *Cluster, namespace string, podTemplate apiv1.PodTemplateSpec, imageCreds registry.ImageCreds) { + creds := registry.NoCredentials() + for _, imagePullSecret := range podTemplate.Spec.ImagePullSecrets { + secret, err := c.client.Secrets(namespace).Get(imagePullSecret.Name, meta_v1.GetOptions{}) + if err != nil { + c.logger.Log("err", errors.Wrapf(err, "getting secret %q from namespace %q", secret.Name, namespace)) + continue + } + + var decoded []byte + var ok bool + // These differ in format; but, ParseCredentials will + // handle either. + switch api.SecretType(secret.Type) { + case api.SecretTypeDockercfg: + decoded, ok = secret.Data[api.DockerConfigKey] + case api.SecretTypeDockerConfigJson: + decoded, ok = secret.Data[api.DockerConfigJsonKey] + default: + c.logger.Log("skip", "unknown type", "secret", namespace+"/"+secret.Name, "type", secret.Type) + continue + } + + if !ok { + c.logger.Log("err", errors.Wrapf(err, "retrieving pod secret %q", secret.Name)) + continue + } + + // Parse secret + crd, err := registry.ParseCredentials(decoded) + if err != nil { + c.logger.Log("err", err.Error()) + continue + } + + // Merge into the credentials for this PodSpec + creds.Merge(crd) + } + + // Now create the service and attach the credentials + for _, container := range podTemplate.Spec.Containers { + r, err := flux.ParseImageID(container.Image) + if err != nil { + c.logger.Log("err", err.Error()) + continue + } + imageCreds[r] = creds + } +} + // ImagesToFetch is a k8s specific method to get a list of images to update along with their credentials func (c *Cluster) ImagesToFetch() registry.ImageCreds { - return MakeAllImageCreds(c) + allImageCreds := make(registry.ImageCreds) + + namespaces, err := c.client.Namespaces().List(meta_v1.ListOptions{}) + if err != nil { + c.logger.Log("err", errors.Wrap(err, "getting namespaces")) + return allImageCreds + } + + for _, ns := range namespaces.Items { + for _, resourceKind := range resourceKinds { + podControllers, err := resourceKind.getPodControllers(c, ns.Name) + if err != nil { + c.logger.Log("err", errors.Wrapf(err, "getting kind %s for namespace %s", resourceKind, ns.Name)) + continue + } + + imageCreds := make(registry.ImageCreds) + for _, podController := range podControllers { + mergeCredentials(c, ns.Name, podController.podTemplate, imageCreds) + } + + // Merge creds + for imageID, creds := range imageCreds { + existingCreds, ok := allImageCreds[imageID] + if ok { + existingCreds.Merge(creds) + } else { + allImageCreds[imageID] = creds + } + } + } + } + + return allImageCreds } // --- end cluster.Cluster diff --git a/cluster/kubernetes/resourcekinds.go b/cluster/kubernetes/resourcekinds.go index e2b4b782a..b44e31c44 100644 --- a/cluster/kubernetes/resourcekinds.go +++ b/cluster/kubernetes/resourcekinds.go @@ -1,93 +1,22 @@ package kubernetes import ( - "bytes" "fmt" - "github.com/pkg/errors" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/pkg/api" apiv1 "k8s.io/client-go/pkg/api/v1" apiext "k8s.io/client-go/pkg/apis/extensions/v1beta1" "github.com/weaveworks/flux" "github.com/weaveworks/flux/cluster" - "github.com/weaveworks/flux/registry" ) -// MakeController builds a cluster.Controller for a specific resourceID. -func MakeController(c *Cluster, id flux.ResourceID) (*cluster.Controller, error) { - _, kind, _ := id.Components() - - resourceKind := resourceKinds[kind] - if resourceKind == nil { - return nil, fmt.Errorf("Unsupported kind %v", kind) - } - - return resourceKind.makeController(c, id) -} - -// MakeAllControllers builds a cluster.Controller for all supported kinds of resource -// in the specified namespace. -func MakeAllControllers(c *Cluster, namespace string) ([]cluster.Controller, error) { - var allControllers []cluster.Controller - for _, resourceKind := range resourceKinds { - controllers, err := resourceKind.makeAllControllers(c, namespace) - if err != nil { - return nil, err - } - allControllers = append(allControllers, controllers...) - } - return allControllers, nil -} - -// MakeAllImageCreds returns a credentials map for all images specified by every kind of -// supported resource. -func MakeAllImageCreds(c *Cluster) registry.ImageCreds { - allImageCreds := make(registry.ImageCreds) - - namespaces, err := c.client.Namespaces().List(meta_v1.ListOptions{}) - if err != nil { - c.logger.Log("err", errors.Wrap(err, "getting namespaces")) - return allImageCreds - } - - for _, ns := range namespaces.Items { - for _, resourceKind := range resourceKinds { - imageCreds := resourceKind.makeAllImageCreds(c, ns.Name) - - // Merge creds - for imageID, creds := range imageCreds { - existingCreds, ok := allImageCreds[imageID] - if ok { - existingCreds.Merge(creds) - } else { - allImageCreds[imageID] = creds - } - } - } - } - - return allImageCreds -} - -func AppendYAML(c *Cluster, namespace string, buffer *bytes.Buffer) error { - for _, resourceKind := range resourceKinds { - if err := resourceKind.appendYAML(c, namespace, buffer); err != nil { - return err - } - } - return nil -} - ///////////////////////////////////////////////////////////////////////////// // Kind registry type resourceKind interface { - makeController(c *Cluster, id flux.ResourceID) (*cluster.Controller, error) - makeAllControllers(c *Cluster, namespace string) ([]cluster.Controller, error) - makeAllImageCreds(c *Cluster, namespace string) registry.ImageCreds - appendYAML(c *Cluster, namespace string, buffer *bytes.Buffer) error + getPodController(c *Cluster, namespace, name string) (podController, error) + getPodControllers(c *Cluster, namespace string) ([]podController, error) } var ( @@ -99,157 +28,88 @@ func init() { resourceKinds["daemonset"] = &daemonSetKind{} } -///////////////////////////////////////////////////////////////////////////// -// Common kind utility functions - -func mergeCredentials(c *Cluster, namespace string, podTemplate apiv1.PodTemplateSpec, imageCreds registry.ImageCreds) { - creds := registry.NoCredentials() - for _, imagePullSecret := range podTemplate.Spec.ImagePullSecrets { - secret, err := c.client.Secrets(namespace).Get(imagePullSecret.Name, meta_v1.GetOptions{}) - if err != nil { - c.logger.Log("err", errors.Wrapf(err, "getting secret %q from namespace %q", secret.Name, namespace)) - continue - } - - var decoded []byte - var ok bool - // These differ in format; but, ParseCredentials will - // handle either. - switch api.SecretType(secret.Type) { - case api.SecretTypeDockercfg: - decoded, ok = secret.Data[api.DockerConfigKey] - case api.SecretTypeDockerConfigJson: - decoded, ok = secret.Data[api.DockerConfigJsonKey] - default: - c.logger.Log("skip", "unknown type", "secret", namespace+"/"+secret.Name, "type", secret.Type) - continue - } - - if !ok { - c.logger.Log("err", errors.Wrapf(err, "retrieving pod secret %q", secret.Name)) - continue - } - - // Parse secret - crd, err := registry.ParseCredentials(decoded) - if err != nil { - c.logger.Log("err", err.Error()) - continue - } - - // Merge into the credentials for this PodSpec - creds.Merge(crd) - } - - // Now create the service and attach the credentials - for _, container := range podTemplate.Spec.Containers { - r, err := flux.ParseImageID(container.Image) - if err != nil { - c.logger.Log("err", err.Error()) - continue - } - imageCreds[r] = creds - } +type podController struct { + apiVersion string + kind string + name string + status string + podTemplate apiv1.PodTemplateSpec + apiObject interface{} } -func makeController(id flux.ResourceID, status string, containers []apiv1.Container) *cluster.Controller { +func (pc podController) toClusterController(resourceID flux.ResourceID) cluster.Controller { var clusterContainers []cluster.Container - for _, container := range containers { + for _, container := range pc.podTemplate.Spec.Containers { clusterContainers = append(clusterContainers, cluster.Container{Name: container.Name, Image: container.Image}) } - return &cluster.Controller{ - ID: id, - Status: status, + return cluster.Controller{ + ID: resourceID, + Status: pc.status, Containers: cluster.ContainersOrExcuse{Containers: clusterContainers}, } } +func (pc podController) GetNamespace() string { + objectMeta := pc.apiObject.(namespacedLabeled) + return objectMeta.GetNamespace() +} + +func (pc podController) GetLabels() map[string]string { + objectMeta := pc.apiObject.(namespacedLabeled) + return objectMeta.GetLabels() +} + ///////////////////////////////////////////////////////////////////////////// // extensions/v1beta1 Deployment type deploymentKind struct{} -func (*deploymentKind) makeController(c *Cluster, id flux.ResourceID) (*cluster.Controller, error) { - ns, _, name := id.Components() - - deployment, err := c.client.Deployments(ns).Get(name, meta_v1.GetOptions{}) +func (dk *deploymentKind) getPodController(c *Cluster, namespace, name string) (podController, error) { + deployment, err := c.client.Deployments(namespace).Get(name, meta_v1.GetOptions{}) if err != nil { - return nil, errors.Wrapf(err, "fetching deployment %s for namespace %S", name, ns) - } - if isAddon(deployment) { - return nil, nil - } - return makeDeploymentController(id, deployment), nil -} - -func (*deploymentKind) makeAllControllers(c *Cluster, namespace string) ([]cluster.Controller, error) { - var controllers []cluster.Controller - - deployments, err := c.client.Deployments(namespace).List(meta_v1.ListOptions{}) - if err != nil { - return nil, errors.Wrapf(err, "getting deployments for namespace %s", namespace) - } - - for _, deployment := range deployments.Items { - if !isAddon(&deployment) { - id := flux.MakeResourceID(namespace, "deployment", deployment.Name) - controllers = append(controllers, *makeDeploymentController(id, &deployment)) - } + return podController{}, err } - return controllers, nil + return makeDeploymentPodController(deployment), nil } -func (*deploymentKind) makeAllImageCreds(c *Cluster, namespace string) registry.ImageCreds { - imageCreds := make(registry.ImageCreds) - +func (dk *deploymentKind) getPodControllers(c *Cluster, namespace string) ([]podController, error) { deployments, err := c.client.Deployments(namespace).List(meta_v1.ListOptions{}) if err != nil { - c.logger.Log("err", errors.Wrapf(err, "getting deployments for namespace %s", namespace)) - return imageCreds + return nil, err } + var podControllers []podController for _, deployment := range deployments.Items { - mergeCredentials(c, namespace, deployment.Spec.Template, imageCreds) + podControllers = append(podControllers, makeDeploymentPodController(&deployment)) } - return imageCreds -} - -func (*deploymentKind) appendYAML(c *Cluster, namespace string, buffer *bytes.Buffer) error { - deployments, err := c.client.Deployments(namespace).List(meta_v1.ListOptions{}) - if err != nil { - return errors.Wrap(err, "getting deployments") - } - for _, deployment := range deployments.Items { - if isAddon(&deployment) { - continue - } - if err := appendYAML(buffer, "extensions/v1beta1", "Deployment", deployment); err != nil { - return errors.Wrap(err, "marshalling deployment to YAML") - } - } - return nil + return podControllers, nil } -// makeDeploymentController builds a cluster.Controller from a kubernetes Deployment -func makeDeploymentController(id flux.ResourceID, deployment *apiext.Deployment) *cluster.Controller { - var statusMessage string - meta, status := deployment.ObjectMeta, deployment.Status - if status.ObservedGeneration >= meta.Generation { +func makeDeploymentPodController(deployment *apiext.Deployment) podController { + var status string + objectMeta, deploymentStatus := deployment.ObjectMeta, deployment.Status + if deploymentStatus.ObservedGeneration >= objectMeta.Generation { // the definition has been updated; now let's see about the replicas - updated, wanted := status.UpdatedReplicas, *deployment.Spec.Replicas + updated, wanted := deploymentStatus.UpdatedReplicas, *deployment.Spec.Replicas if updated == wanted { - statusMessage = StatusReady + status = StatusReady } else { - statusMessage = fmt.Sprintf("%d out of %d updated", updated, wanted) + status = fmt.Sprintf("%d out of %d updated", updated, wanted) } } else { - statusMessage = StatusUpdating + status = StatusUpdating } - return makeController(id, statusMessage, deployment.Spec.Template.Spec.Containers) + return podController{ + apiVersion: "extensions/v1beta1", + kind: "Deployment", + name: deployment.ObjectMeta.Name, + status: status, + podTemplate: deployment.Spec.Template, + apiObject: deployment} } ///////////////////////////////////////////////////////////////////////////// @@ -257,85 +117,51 @@ func makeDeploymentController(id flux.ResourceID, deployment *apiext.Deployment) type daemonSetKind struct{} -func (*daemonSetKind) makeController(c *Cluster, id flux.ResourceID) (*cluster.Controller, error) { - ns, _, name := id.Components() - - daemonSet, err := c.client.DaemonSets(ns).Get(name, meta_v1.GetOptions{}) +func (dk *daemonSetKind) getPodController(c *Cluster, namespace, name string) (podController, error) { + daemonSet, err := c.client.DaemonSets(namespace).Get(name, meta_v1.GetOptions{}) if err != nil { - return nil, errors.Wrapf(err, "fetching daemonSet %s for namespace %S", name, ns) - } - if isAddon(daemonSet) { - return nil, nil - } - return makeDaemonSetController(id, daemonSet), nil -} - -func (*daemonSetKind) makeAllControllers(c *Cluster, namespace string) ([]cluster.Controller, error) { - var controllers []cluster.Controller - daemonSets, err := c.client.DaemonSets(namespace).List(meta_v1.ListOptions{}) - if err != nil { - return nil, errors.Wrapf(err, "getting daemonSets for namespace %s", namespace) - } - - for _, daemonSet := range daemonSets.Items { - if !isAddon(&daemonSet) { - id := flux.MakeResourceID(namespace, "daemonSet", daemonSet.Name) - controllers = append(controllers, *makeDaemonSetController(id, &daemonSet)) - } + return podController{}, err } - return controllers, nil + return makeDaemonSetPodController(daemonSet), nil } -func (*daemonSetKind) makeAllImageCreds(c *Cluster, namespace string) registry.ImageCreds { - imageCreds := make(registry.ImageCreds) - +func (dk *daemonSetKind) getPodControllers(c *Cluster, namespace string) ([]podController, error) { daemonSets, err := c.client.DaemonSets(namespace).List(meta_v1.ListOptions{}) if err != nil { - c.logger.Log("err", errors.Wrapf(err, "getting daemonSets for namespace %s", namespace)) - return imageCreds + return nil, err } + var podControllers []podController for _, daemonSet := range daemonSets.Items { - mergeCredentials(c, namespace, daemonSet.Spec.Template, imageCreds) + podControllers = append(podControllers, makeDaemonSetPodController(&daemonSet)) } - return imageCreds -} - -func (*daemonSetKind) appendYAML(c *Cluster, namespace string, buffer *bytes.Buffer) error { - daemonSets, err := c.client.DaemonSets(namespace).List(meta_v1.ListOptions{}) - if err != nil { - return errors.Wrap(err, "getting daemonSets") - } - for _, daemonSet := range daemonSets.Items { - if isAddon(&daemonSet) { - continue - } - if err := appendYAML(buffer, "extensions/v1beta1", "DaemonSet", daemonSet); err != nil { - return errors.Wrap(err, "marshalling daemonSet to YAML") - } - } - return nil + return podControllers, nil } -// makeDaemonSetController builds a cluster.Controller from a kubernetes DaemonSet -func makeDaemonSetController(id flux.ResourceID, daemonSet *apiext.DaemonSet) *cluster.Controller { - var statusMessage string - meta, status := daemonSet.ObjectMeta, daemonSet.Status - if status.ObservedGeneration >= meta.Generation { +func makeDaemonSetPodController(daemonSet *apiext.DaemonSet) podController { + var status string + objectMeta, daemonSetStatus := daemonSet.ObjectMeta, daemonSet.Status + if daemonSetStatus.ObservedGeneration >= objectMeta.Generation { // the definition has been updated; now let's see about the replicas - updated, wanted := status.UpdatedNumberScheduled, status.DesiredNumberScheduled + updated, wanted := daemonSetStatus.UpdatedNumberScheduled, daemonSetStatus.DesiredNumberScheduled if updated == wanted { - statusMessage = StatusReady + status = StatusReady } else { - statusMessage = fmt.Sprintf("%d out of %d updated", updated, wanted) + status = fmt.Sprintf("%d out of %d updated", updated, wanted) } } else { - statusMessage = StatusUpdating + status = StatusUpdating } - return makeController(id, statusMessage, daemonSet.Spec.Template.Spec.Containers) + return podController{ + apiVersion: "extensions/v1beta1", + kind: "DaemonSet", + name: daemonSet.ObjectMeta.Name, + status: status, + podTemplate: daemonSet.Spec.Template, + apiObject: daemonSet} } /////////////////////////////////////////////////////////////////////////////