Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1931 from 2opremio/1925-contigous-yaml-EOD
Browse files Browse the repository at this point in the history
Harden and simplify yaml stream decoding/encoding
  • Loading branch information
Alfonso Acosta authored Apr 15, 2019
2 parents 2c0176c + abe9db8 commit 5ee497b
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 137 deletions.
2 changes: 1 addition & 1 deletion cluster/kubernetes/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (c *Cluster) ImagesToFetch() registry.ImageCreds {

imageCreds := make(registry.ImageCreds)
for _, workload := range workloads {
logger := log.With(c.logger, "resource", flux.MakeResourceID(ns.Name, kind, workload.name))
logger := log.With(c.logger, "resource", flux.MakeResourceID(ns.Name, kind, workload.GetName()))
mergeCredentials(logger.Log, c.includeImage, c.client, ns.Name, workload.podTemplate, imageCreds, seenCreds)
}

Expand Down
40 changes: 24 additions & 16 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package kubernetes

import (
"bytes"
"encoding/json"
"fmt"
"sync"

k8syaml "github.com/ghodss/yaml"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"gopkg.in/yaml.v2"
apiv1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -58,6 +59,7 @@ func MakeClusterClientset(core coreClient, dyn dynamicClient, fluxhelm fluxHelmC
// Kubernetes metadata. These methods are implemented by the
// Kubernetes API resource types.
type k8sObject interface {
GetName() string
GetNamespace() string
GetLabels() map[string]string
GetAnnotations() map[string]string
Expand Down Expand Up @@ -186,7 +188,7 @@ func (c *Cluster) AllWorkloads(namespace string) (res []cluster.Workload, err er

for _, workload := range workloads {
if !isAddon(workload) {
id := flux.MakeResourceID(ns.Name, kind, workload.name)
id := flux.MakeResourceID(ns.Name, kind, workload.GetName())
c.muSyncErrors.RLock()
workload.syncError = c.syncErrors[id]
c.muSyncErrors.RUnlock()
Expand Down Expand Up @@ -222,8 +224,14 @@ func (c *Cluster) Export() ([]byte, error) {
return nil, errors.Wrap(err, "getting namespaces")
}

encoder := yaml.NewEncoder(&config)
defer encoder.Close()

for _, ns := range namespaces {
err := appendYAML(&config, "v1", "Namespace", ns)
// kind & apiVersion must be set, since TypeMeta is not populated
ns.Kind = "Namespace"
ns.APIVersion = "v1"
err := encoder.Encode(yamlThroughJSON{ns})
if err != nil {
return nil, errors.Wrap(err, "marshalling namespace to YAML")
}
Expand All @@ -246,7 +254,7 @@ func (c *Cluster) Export() ([]byte, error) {

for _, pc := range workloads {
if !isAddon(pc) {
if err := appendYAML(&config, pc.apiVersion, pc.kind, pc.k8sObject); err != nil {
if err := encoder.Encode(yamlThroughJSON{pc.k8sObject}); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -325,18 +333,18 @@ func (c *Cluster) IsAllowedResource(id flux.ResourceID) bool {
return false
}

// kind & apiVersion must be passed separately as the object's TypeMeta is not populated
func appendYAML(buffer *bytes.Buffer, apiVersion, kind string, object interface{}) error {
yamlBytes, err := k8syaml.Marshal(object)
type yamlThroughJSON struct {
toMarshal interface{}
}

func (y yamlThroughJSON) MarshalYAML() (interface{}, error) {
rawJSON, err := json.Marshal(y.toMarshal)
if err != nil {
return err
return nil, fmt.Errorf("error marshaling into JSON: %s", err)
}
var jsonObj interface{}
if err = yaml.Unmarshal(rawJSON, &jsonObj); err != nil {
return nil, fmt.Errorf("error unmarshaling from JSON: %s", err)
}
buffer.WriteString("---\n")
buffer.WriteString("apiVersion: ")
buffer.WriteString(apiVersion)
buffer.WriteString("\nkind: ")
buffer.WriteString(kind)
buffer.WriteString("\n")
buffer.Write(yamlBytes)
return nil
return jsonObj, nil
}
73 changes: 21 additions & 52 deletions cluster/kubernetes/resource/load.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package resource

import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"

"github.com/pkg/errors"
"gopkg.in/yaml.v2"
)

// Load takes paths to directories or files, and creates an object set
Expand Down Expand Up @@ -128,21 +129,26 @@ func looksLikeChart(dir string) bool {
// constructs an object set from the resources represented therein.
func ParseMultidoc(multidoc []byte, source string) (map[string]KubeManifest, error) {
objs := map[string]KubeManifest{}
chunks := bufio.NewScanner(bytes.NewReader(multidoc))
initialBuffer := make([]byte, 4096) // Matches startBufSize in bufio/scan.go
chunks.Buffer(initialBuffer, 1024*1024) // Allow growth to 1MB
chunks.Split(splitYAMLDocument)

decoder := yaml.NewDecoder(bytes.NewReader(multidoc))
var obj KubeManifest
var err error
for chunks.Scan() {
// It's not guaranteed that the return value of Bytes() will not be mutated later:
// https://golang.org/pkg/bufio/#Scanner.Bytes
// But we will be snaffling it away, so make a copy.
bytes := chunks.Bytes()
bytes2 := make([]byte, len(bytes), cap(bytes))
copy(bytes2, bytes)
if obj, err = unmarshalObject(source, bytes2); err != nil {
for {
// In order to use the decoder to extract raw documents
// from the stream, we decode generically and encode again.
// The result is the raw document from the stream
// (pretty-printed and without comments)
// NOTE: gopkg.in/yaml.v3 supports round tripping comments
// by using `gopkg.in/yaml.v3.Node`.
var val interface{}
if err := decoder.Decode(&val); err != nil {
break
}
bytes, err := yaml.Marshal(val)
if err != nil {
return nil, errors.Wrapf(err, "parsing YAML doc from %q", source)
}

if obj, err = unmarshalObject(source, bytes); err != nil {
return nil, errors.Wrapf(err, "parsing YAML doc from %q", source)
}
if obj == nil {
Expand All @@ -159,45 +165,8 @@ func ParseMultidoc(multidoc []byte, source string) (map[string]KubeManifest, err
}
}

if err := chunks.Err(); err != nil {
if err != io.EOF {
return objs, errors.Wrapf(err, "scanning multidoc from %q", source)
}
return objs, nil
}

// ---
// Taken directly from https://github.com/kubernetes/apimachinery/blob/master/pkg/util/yaml/decoder.go.

const yamlSeparator = "\n---"

// splitYAMLDocument is a bufio.SplitFunc for splitting YAML streams into individual documents.
func splitYAMLDocument(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
sep := len([]byte(yamlSeparator))
if i := bytes.Index(data, []byte(yamlSeparator)); i >= 0 {
// We have a potential document terminator
i += sep
after := data[i:]
if len(after) == 0 {
// we can't read any more characters
if atEOF {
return len(data), data[:len(data)-sep], nil
}
return 0, nil, nil
}
if j := bytes.IndexByte(after, '\n'); j >= 0 {
return i + j + 1, data[0 : i-sep], nil
}
return 0, nil, nil
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), data, nil
}
// Request more data.
return 0, nil, nil
}

// ---
21 changes: 21 additions & 0 deletions cluster/kubernetes/resource/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,27 @@ data:
}
}

func TestParseBoundaryMarkers(t *testing.T) {
doc := `---
kind: ConfigMap
metadata:
name: bigmap
---
...
---
...
---
...
---
...
`
buffer := bytes.NewBufferString(doc)

resources, err := ParseMultidoc(buffer.Bytes(), "test")
assert.NoError(t, err)
assert.Len(t, resources, 1)
}

func TestParseCronJob(t *testing.T) {
doc := `---
apiVersion: batch/v1beta1
Expand Down
41 changes: 17 additions & 24 deletions cluster/kubernetes/resourcekinds.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ func init() {

type workload struct {
k8sObject
apiVersion string
kind string
name string
status string
rollout cluster.RolloutStatus
syncError error
Expand Down Expand Up @@ -179,11 +176,10 @@ func makeDeploymentWorkload(deployment *apiapps.Deployment) workload {
status = cluster.StatusError
}
}

// apiVersion & kind must be set, since TypeMeta is not populated
deployment.APIVersion = "apps/v1"
deployment.Kind = "Deployment"
return workload{
apiVersion: "apps/v1",
kind: "Deployment",
name: deployment.ObjectMeta.Name,
status: status,
rollout: rollout,
podTemplate: deployment.Spec.Template,
Expand Down Expand Up @@ -241,10 +237,10 @@ func makeDaemonSetWorkload(daemonSet *apiapps.DaemonSet) workload {
}
}

// apiVersion & kind must be set, since TypeMeta is not populated
daemonSet.APIVersion = "apps/v1"
daemonSet.Kind = "DaemonSet"
return workload{
apiVersion: "apps/v1",
kind: "DaemonSet",
name: daemonSet.ObjectMeta.Name,
status: status,
rollout: rollout,
podTemplate: daemonSet.Spec.Template,
Expand Down Expand Up @@ -334,10 +330,10 @@ func makeStatefulSetWorkload(statefulSet *apiapps.StatefulSet) workload {
}
}

// apiVersion & kind must be set, since TypeMeta is not populated
statefulSet.APIVersion = "apps/v1"
statefulSet.Kind = "StatefulSet"
return workload{
apiVersion: "apps/v1",
kind: "StatefulSet",
name: statefulSet.ObjectMeta.Name,
status: status,
rollout: rollout,
podTemplate: statefulSet.Spec.Template,
Expand Down Expand Up @@ -373,10 +369,9 @@ func (dk *cronJobKind) getWorkloads(c *Cluster, namespace string) ([]workload, e
}

func makeCronJobWorkload(cronJob *apibatch.CronJob) workload {
cronJob.APIVersion = "batch/v1beta1"
cronJob.Kind = "CronJob"
return workload{
apiVersion: "batch/v1beta1",
kind: "CronJob",
name: cronJob.ObjectMeta.Name,
status: cluster.StatusReady,
podTemplate: cronJob.Spec.JobTemplate.Spec.Template,
k8sObject: cronJob}
Expand Down Expand Up @@ -419,11 +414,10 @@ func makeFluxHelmReleaseWorkload(fluxHelmRelease *fhr_v1alpha2.FluxHelmRelease)
ImagePullSecrets: []apiv1.LocalObjectReference{},
},
}

// apiVersion & kind must be set, since TypeMeta is not populated
fluxHelmRelease.APIVersion = "helm.integrations.flux.weave.works/v1alpha2"
fluxHelmRelease.Kind = "FluxHelmRelease"
return workload{
apiVersion: "helm.integrations.flux.weave.works/v1alpha2",
kind: "FluxHelmRelease",
name: fluxHelmRelease.ObjectMeta.Name,
status: fluxHelmRelease.Status.ReleaseStatus,
podTemplate: podTemplate,
k8sObject: fluxHelmRelease,
Expand Down Expand Up @@ -482,11 +476,10 @@ func makeHelmReleaseWorkload(helmRelease *fhr_v1beta1.HelmRelease) workload {
ImagePullSecrets: []apiv1.LocalObjectReference{},
},
}

// apiVersion & kind must be set, since TypeMeta is not populated
helmRelease.APIVersion = "flux.weave.works/v1beta1"
helmRelease.Kind = "HelmRelease"
return workload{
apiVersion: "flux.weave.works/v1beta1",
kind: "HelmRelease",
name: helmRelease.ObjectMeta.Name,
status: helmRelease.Status.ReleaseStatus,
podTemplate: podTemplate,
k8sObject: helmRelease,
Expand Down
Loading

0 comments on commit 5ee497b

Please sign in to comment.