From 9d2e1a0ae2eb4d723f79ea3e697241fab4493e4d Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Fri, 12 Apr 2019 21:04:13 +0200 Subject: [PATCH 1/4] Harden yaml stream parsing of manifests files Flux choked on end-of-document markers (`...`). To avoid complicating the existing multidoc parser (stolen from kubernetes) I abused go-yaml's Decoder to obtain the raw documents from the stream by unmarshalling to an interface{} and marshalling again. --- cluster/kubernetes/resource/load.go | 71 +++++++----------------- cluster/kubernetes/resource/load_test.go | 21 +++++++ 2 files changed, 40 insertions(+), 52 deletions(-) diff --git a/cluster/kubernetes/resource/load.go b/cluster/kubernetes/resource/load.go index 69f3efec4..6a4e96327 100644 --- a/cluster/kubernetes/resource/load.go +++ b/cluster/kubernetes/resource/load.go @@ -1,14 +1,15 @@ package resource import ( - "bufio" "bytes" "fmt" + "io" "io/ioutil" "os" "path/filepath" "github.com/pkg/errors" + "gopkg.in/yaml.v2" ) // Load takes paths to directories or files, and creates an object set @@ -128,21 +129,24 @@ func looksLikeChart(dir string) bool { // constructs an object set from the resources represented therein. func ParseMultidoc(multidoc []byte, source string) (map[string]KubeManifest, error) { objs := map[string]KubeManifest{} - chunks := bufio.NewScanner(bytes.NewReader(multidoc)) - initialBuffer := make([]byte, 4096) // Matches startBufSize in bufio/scan.go - chunks.Buffer(initialBuffer, 1024*1024) // Allow growth to 1MB - chunks.Split(splitYAMLDocument) - + decoder := yaml.NewDecoder(bytes.NewReader(multidoc)) var obj KubeManifest var err error - for chunks.Scan() { - // It's not guaranteed that the return value of Bytes() will not be mutated later: - // https://golang.org/pkg/bufio/#Scanner.Bytes - // But we will be snaffling it away, so make a copy. - bytes := chunks.Bytes() - bytes2 := make([]byte, len(bytes), cap(bytes)) - copy(bytes2, bytes) - if obj, err = unmarshalObject(source, bytes2); err != nil { + for { + // In order to use the decoder to extract raw documents + // from the stream, we decode generically and encode again + // The result it the raw document (pretty-printed and + // without comments though) + var val interface{} + if err := decoder.Decode(&val); err != nil { + break + } + bytes, err := yaml.Marshal(val) + if err != nil { + return nil, errors.Wrapf(err, "parsing YAML doc from %q", source) + } + + if obj, err = unmarshalObject(source, bytes); err != nil { return nil, errors.Wrapf(err, "parsing YAML doc from %q", source) } if obj == nil { @@ -159,45 +163,8 @@ func ParseMultidoc(multidoc []byte, source string) (map[string]KubeManifest, err } } - if err := chunks.Err(); err != nil { + if err != io.EOF { return objs, errors.Wrapf(err, "scanning multidoc from %q", source) } return objs, nil } - -// --- -// Taken directly from https://github.com/kubernetes/apimachinery/blob/master/pkg/util/yaml/decoder.go. - -const yamlSeparator = "\n---" - -// splitYAMLDocument is a bufio.SplitFunc for splitting YAML streams into individual documents. -func splitYAMLDocument(data []byte, atEOF bool) (advance int, token []byte, err error) { - if atEOF && len(data) == 0 { - return 0, nil, nil - } - sep := len([]byte(yamlSeparator)) - if i := bytes.Index(data, []byte(yamlSeparator)); i >= 0 { - // We have a potential document terminator - i += sep - after := data[i:] - if len(after) == 0 { - // we can't read any more characters - if atEOF { - return len(data), data[:len(data)-sep], nil - } - return 0, nil, nil - } - if j := bytes.IndexByte(after, '\n'); j >= 0 { - return i + j + 1, data[0 : i-sep], nil - } - return 0, nil, nil - } - // If we're at EOF, we have a final, non-terminated line. Return it. - if atEOF { - return len(data), data, nil - } - // Request more data. - return 0, nil, nil -} - -// --- diff --git a/cluster/kubernetes/resource/load_test.go b/cluster/kubernetes/resource/load_test.go index 3108622e2..0ea8facfa 100644 --- a/cluster/kubernetes/resource/load_test.go +++ b/cluster/kubernetes/resource/load_test.go @@ -121,6 +121,27 @@ data: } } +func TestParseBoundaryMarkers(t *testing.T) { + doc := `--- +kind: ConfigMap +metadata: + name: bigmap +--- +... +--- +... +--- +... +--- +... +` + buffer := bytes.NewBufferString(doc) + + resources, err := ParseMultidoc(buffer.Bytes(), "test") + assert.NoError(t, err) + assert.Len(t, resources, 1) +} + func TestParseCronJob(t *testing.T) { doc := `--- apiVersion: batch/v1beta1 From 8edd13f0f6647f368e8865be03e600df3d930701 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Sat, 13 Apr 2019 02:42:02 +0200 Subject: [PATCH 2/4] Harden yaml stream decoding in `fluctl save` Use go-yaml's Decoder instead of our own --- cluster/kubernetes/resource/load.go | 6 ++-- cmd/fluxctl/save_cmd.go | 53 +++++------------------------ 2 files changed, 12 insertions(+), 47 deletions(-) diff --git a/cluster/kubernetes/resource/load.go b/cluster/kubernetes/resource/load.go index 6a4e96327..1d4c1d176 100644 --- a/cluster/kubernetes/resource/load.go +++ b/cluster/kubernetes/resource/load.go @@ -134,9 +134,9 @@ func ParseMultidoc(multidoc []byte, source string) (map[string]KubeManifest, err var err error for { // In order to use the decoder to extract raw documents - // from the stream, we decode generically and encode again - // The result it the raw document (pretty-printed and - // without comments though) + // from the stream, we decode generically and encode again. + // The result is the raw document from the stream + // (pretty-printed and without comments) var val interface{} if err := decoder.Decode(&val); err != nil { break diff --git a/cmd/fluxctl/save_cmd.go b/cmd/fluxctl/save_cmd.go index 0913061b2..662c1d3ad 100644 --- a/cmd/fluxctl/save_cmd.go +++ b/cmd/fluxctl/save_cmd.go @@ -1,7 +1,6 @@ package main import ( - "bufio" "bytes" "context" "fmt" @@ -11,7 +10,7 @@ import ( "github.com/pkg/errors" "github.com/spf13/cobra" - yaml "gopkg.in/yaml.v2" + "gopkg.in/yaml.v2" ) type saveOpts struct { @@ -63,9 +62,6 @@ func (opts *saveOpts) RunE(cmd *cobra.Command, args []string) error { return errors.Wrap(err, "exporting config") } - yamls := bufio.NewScanner(bytes.NewReader(config)) - yamls.Split(splitYAMLDocument) - if opts.path != "-" { // check supplied path is a directory if info, err := os.Stat(opts.path); err != nil { @@ -75,11 +71,14 @@ func (opts *saveOpts) RunE(cmd *cobra.Command, args []string) error { } } - for yamls.Scan() { + decoder := yaml.NewDecoder(bytes.NewReader(config)) + + var decoderErr error + for { var object saveObject // Most unwanted fields are ignored at this point - if err := yaml.Unmarshal(yamls.Bytes(), &object); err != nil { - return errors.Wrap(err, "unmarshalling exported yaml") + if decoderErr = decoder.Decode(&object); decoderErr != nil { + break } // Filter out remaining unwanted keys from unstructured fields @@ -91,8 +90,8 @@ func (opts *saveOpts) RunE(cmd *cobra.Command, args []string) error { } } - if yamls.Err() != nil { - return errors.Wrap(yamls.Err(), "splitting exported yaml") + if decoderErr != io.EOF { + return errors.Wrap(err, "unmarshalling exported yaml") } return nil @@ -211,37 +210,3 @@ func abbreviateKind(kind string) string { return kind } } - -// Copied from k8s.io/client-go/1.5/pkg/util/yaml/decoder.go - -const yamlSeparator = "\n---" - -// splitYAMLDocument is a bufio.SplitFunc for splitting YAML streams into individual documents. -func splitYAMLDocument(data []byte, atEOF bool) (advance int, token []byte, err error) { - if atEOF && len(data) == 0 { - return 0, nil, nil - } - sep := len([]byte(yamlSeparator)) - if i := bytes.Index(data, []byte(yamlSeparator)); i >= 0 { - // We have a potential document terminator - i += sep - after := data[i:] - if len(after) == 0 { - // we can't read any more characters - if atEOF { - return len(data), data[:len(data)-sep], nil - } - return 0, nil, nil - } - if j := bytes.IndexByte(after, '\n'); j >= 0 { - return i + j + 1, data[0 : i-sep], nil - } - return 0, nil, nil - } - // If we're at EOF, we have a final, non-terminated line. Return it. - if atEOF { - return len(data), data, nil - } - // Request more data. - return 0, nil, nil -} From fb4fba6a70560826dda57ae7136fc9306706030d Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Sat, 13 Apr 2019 16:04:49 +0200 Subject: [PATCH 3/4] Add note about preserving comments with gopkg.in/yaml.v3 --- cluster/kubernetes/resource/load.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cluster/kubernetes/resource/load.go b/cluster/kubernetes/resource/load.go index 1d4c1d176..e5352783f 100644 --- a/cluster/kubernetes/resource/load.go +++ b/cluster/kubernetes/resource/load.go @@ -137,6 +137,8 @@ func ParseMultidoc(multidoc []byte, source string) (map[string]KubeManifest, err // from the stream, we decode generically and encode again. // The result is the raw document from the stream // (pretty-printed and without comments) + // NOTE: gopkg.in/yaml.v3 supports round tripping comments + // by using `gopkg.in/yaml.v3.Node`. var val interface{} if err := decoder.Decode(&val); err != nil { break From abe9db8febf88e926350c78099e852fe52e0c943 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Sun, 14 Apr 2019 19:33:47 +0200 Subject: [PATCH 4/4] Clean up yaml encoding in Export() --- cluster/kubernetes/images.go | 2 +- cluster/kubernetes/kubernetes.go | 40 +++++++++++++++++----------- cluster/kubernetes/resourcekinds.go | 41 ++++++++++++----------------- 3 files changed, 42 insertions(+), 41 deletions(-) diff --git a/cluster/kubernetes/images.go b/cluster/kubernetes/images.go index 1f8190d19..dccf30684 100644 --- a/cluster/kubernetes/images.go +++ b/cluster/kubernetes/images.go @@ -143,7 +143,7 @@ func (c *Cluster) ImagesToFetch() registry.ImageCreds { imageCreds := make(registry.ImageCreds) for _, workload := range workloads { - logger := log.With(c.logger, "resource", flux.MakeResourceID(ns.Name, kind, workload.name)) + logger := log.With(c.logger, "resource", flux.MakeResourceID(ns.Name, kind, workload.GetName())) mergeCredentials(logger.Log, c.includeImage, c.client, ns.Name, workload.podTemplate, imageCreds, seenCreds) } diff --git a/cluster/kubernetes/kubernetes.go b/cluster/kubernetes/kubernetes.go index 0d3d8fb0a..dbbd4d715 100644 --- a/cluster/kubernetes/kubernetes.go +++ b/cluster/kubernetes/kubernetes.go @@ -2,12 +2,13 @@ package kubernetes import ( "bytes" + "encoding/json" "fmt" "sync" - k8syaml "github.com/ghodss/yaml" "github.com/go-kit/kit/log" "github.com/pkg/errors" + "gopkg.in/yaml.v2" apiv1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -58,6 +59,7 @@ func MakeClusterClientset(core coreClient, dyn dynamicClient, fluxhelm fluxHelmC // Kubernetes metadata. These methods are implemented by the // Kubernetes API resource types. type k8sObject interface { + GetName() string GetNamespace() string GetLabels() map[string]string GetAnnotations() map[string]string @@ -186,7 +188,7 @@ func (c *Cluster) AllWorkloads(namespace string) (res []cluster.Workload, err er for _, workload := range workloads { if !isAddon(workload) { - id := flux.MakeResourceID(ns.Name, kind, workload.name) + id := flux.MakeResourceID(ns.Name, kind, workload.GetName()) c.muSyncErrors.RLock() workload.syncError = c.syncErrors[id] c.muSyncErrors.RUnlock() @@ -222,8 +224,14 @@ func (c *Cluster) Export() ([]byte, error) { return nil, errors.Wrap(err, "getting namespaces") } + encoder := yaml.NewEncoder(&config) + defer encoder.Close() + for _, ns := range namespaces { - err := appendYAML(&config, "v1", "Namespace", ns) + // kind & apiVersion must be set, since TypeMeta is not populated + ns.Kind = "Namespace" + ns.APIVersion = "v1" + err := encoder.Encode(yamlThroughJSON{ns}) if err != nil { return nil, errors.Wrap(err, "marshalling namespace to YAML") } @@ -246,7 +254,7 @@ func (c *Cluster) Export() ([]byte, error) { for _, pc := range workloads { if !isAddon(pc) { - if err := appendYAML(&config, pc.apiVersion, pc.kind, pc.k8sObject); err != nil { + if err := encoder.Encode(yamlThroughJSON{pc.k8sObject}); err != nil { return nil, err } } @@ -325,18 +333,18 @@ func (c *Cluster) IsAllowedResource(id flux.ResourceID) bool { return false } -// kind & apiVersion must be passed separately as the object's TypeMeta is not populated -func appendYAML(buffer *bytes.Buffer, apiVersion, kind string, object interface{}) error { - yamlBytes, err := k8syaml.Marshal(object) +type yamlThroughJSON struct { + toMarshal interface{} +} + +func (y yamlThroughJSON) MarshalYAML() (interface{}, error) { + rawJSON, err := json.Marshal(y.toMarshal) if err != nil { - return err + return nil, fmt.Errorf("error marshaling into JSON: %s", err) + } + var jsonObj interface{} + if err = yaml.Unmarshal(rawJSON, &jsonObj); err != nil { + return nil, fmt.Errorf("error unmarshaling from JSON: %s", err) } - buffer.WriteString("---\n") - buffer.WriteString("apiVersion: ") - buffer.WriteString(apiVersion) - buffer.WriteString("\nkind: ") - buffer.WriteString(kind) - buffer.WriteString("\n") - buffer.Write(yamlBytes) - return nil + return jsonObj, nil } diff --git a/cluster/kubernetes/resourcekinds.go b/cluster/kubernetes/resourcekinds.go index 613cfa434..6ffb3bd1d 100644 --- a/cluster/kubernetes/resourcekinds.go +++ b/cluster/kubernetes/resourcekinds.go @@ -49,9 +49,6 @@ func init() { type workload struct { k8sObject - apiVersion string - kind string - name string status string rollout cluster.RolloutStatus syncError error @@ -179,11 +176,10 @@ func makeDeploymentWorkload(deployment *apiapps.Deployment) workload { status = cluster.StatusError } } - + // apiVersion & kind must be set, since TypeMeta is not populated + deployment.APIVersion = "apps/v1" + deployment.Kind = "Deployment" return workload{ - apiVersion: "apps/v1", - kind: "Deployment", - name: deployment.ObjectMeta.Name, status: status, rollout: rollout, podTemplate: deployment.Spec.Template, @@ -241,10 +237,10 @@ func makeDaemonSetWorkload(daemonSet *apiapps.DaemonSet) workload { } } + // apiVersion & kind must be set, since TypeMeta is not populated + daemonSet.APIVersion = "apps/v1" + daemonSet.Kind = "DaemonSet" return workload{ - apiVersion: "apps/v1", - kind: "DaemonSet", - name: daemonSet.ObjectMeta.Name, status: status, rollout: rollout, podTemplate: daemonSet.Spec.Template, @@ -334,10 +330,10 @@ func makeStatefulSetWorkload(statefulSet *apiapps.StatefulSet) workload { } } + // apiVersion & kind must be set, since TypeMeta is not populated + statefulSet.APIVersion = "apps/v1" + statefulSet.Kind = "StatefulSet" return workload{ - apiVersion: "apps/v1", - kind: "StatefulSet", - name: statefulSet.ObjectMeta.Name, status: status, rollout: rollout, podTemplate: statefulSet.Spec.Template, @@ -373,10 +369,9 @@ func (dk *cronJobKind) getWorkloads(c *Cluster, namespace string) ([]workload, e } func makeCronJobWorkload(cronJob *apibatch.CronJob) workload { + cronJob.APIVersion = "batch/v1beta1" + cronJob.Kind = "CronJob" return workload{ - apiVersion: "batch/v1beta1", - kind: "CronJob", - name: cronJob.ObjectMeta.Name, status: cluster.StatusReady, podTemplate: cronJob.Spec.JobTemplate.Spec.Template, k8sObject: cronJob} @@ -419,11 +414,10 @@ func makeFluxHelmReleaseWorkload(fluxHelmRelease *fhr_v1alpha2.FluxHelmRelease) ImagePullSecrets: []apiv1.LocalObjectReference{}, }, } - + // apiVersion & kind must be set, since TypeMeta is not populated + fluxHelmRelease.APIVersion = "helm.integrations.flux.weave.works/v1alpha2" + fluxHelmRelease.Kind = "FluxHelmRelease" return workload{ - apiVersion: "helm.integrations.flux.weave.works/v1alpha2", - kind: "FluxHelmRelease", - name: fluxHelmRelease.ObjectMeta.Name, status: fluxHelmRelease.Status.ReleaseStatus, podTemplate: podTemplate, k8sObject: fluxHelmRelease, @@ -482,11 +476,10 @@ func makeHelmReleaseWorkload(helmRelease *fhr_v1beta1.HelmRelease) workload { ImagePullSecrets: []apiv1.LocalObjectReference{}, }, } - + // apiVersion & kind must be set, since TypeMeta is not populated + helmRelease.APIVersion = "flux.weave.works/v1beta1" + helmRelease.Kind = "HelmRelease" return workload{ - apiVersion: "flux.weave.works/v1beta1", - kind: "HelmRelease", - name: helmRelease.ObjectMeta.Name, status: helmRelease.Status.ReleaseStatus, podTemplate: podTemplate, k8sObject: helmRelease,