Skip to content

Commit

Permalink
Merge pipeline (#2625)
Browse files Browse the repository at this point in the history
* Merge pipeline

* Updated tests

* Update logic to adding merge key

* Add ideal testcase

* Suggested changes

* Support exec
  • Loading branch information
phanimarupaka authored Jan 18, 2022
1 parent c737224 commit 6be0da6
Show file tree
Hide file tree
Showing 4 changed files with 927 additions and 51 deletions.
5 changes: 4 additions & 1 deletion internal/util/update/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3352,6 +3352,7 @@ func TestRun_remote_subpackages(t *testing.T) {
WithUpstreamRef("foo", "/", "v1.0", "resource-merge").
WithUpstreamLockRef("foo", "/", "v1.0", 0).
WithPipeline(
pkgbuilder.NewFunction("gcr.io/kpt-dev/bar:latest"),
pkgbuilder.NewFunction("gcr.io/kpt-dev/foo:latest"),
),
).
Expand Down Expand Up @@ -3405,6 +3406,7 @@ func TestRun_remote_subpackages(t *testing.T) {
WithUpstreamLockRef(testutil.Upstream, "/", masterBranch, 1).
WithPipeline(
pkgbuilder.NewFunction("gcr.io/kpt-dev/foo:v1"),
pkgbuilder.NewFunction("gcr.io/kpt-dev/zork:v1"),
pkgbuilder.NewFunction("gcr.io/kpt-dev/bar:v1"),
),
).
Expand Down Expand Up @@ -3457,6 +3459,7 @@ func TestRun_remote_subpackages(t *testing.T) {
pkgbuilder.NewKptfile().
WithPipeline(
pkgbuilder.NewFunction("gcr.io/kpt-dev/foo:v1"),
pkgbuilder.NewFunction("gcr.io/kpt-dev/bar:latest"),
),
).
WithResource(pkgbuilder.ConfigMapResource),
Expand Down Expand Up @@ -3491,7 +3494,6 @@ func TestRun_remote_subpackages(t *testing.T) {
WithUpstreamLockRef("foo", "/", masterBranch, 0).
WithPipeline(
pkgbuilder.NewFunction("gcr.io/kpt-dev/zork:v1"),
pkgbuilder.NewFunction("gcr.io/kpt-dev/foo:v1"),
),
).
WithResource(pkgbuilder.ConfigMapResource),
Expand All @@ -3511,6 +3513,7 @@ func TestRun_remote_subpackages(t *testing.T) {
WithUpstreamRef("foo", "/", masterBranch, "resource-merge").
WithUpstreamLockRef("foo", "/", masterBranch, 1).
WithPipeline(
pkgbuilder.NewFunction("gcr.io/kpt-dev/zork:v1"),
pkgbuilder.NewFunction("gcr.io/kpt-dev/foo:latest"),
pkgbuilder.NewFunction("gcr.io/kpt-dev/bar:latest"),
),
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/kptfile/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ type Function struct {
// `ConfigMap` is a convenient way to specify a function config of kind ConfigMap.
ConfigMap map[string]string `yaml:"configMap,omitempty" json:"configMap,omitempty"`

// `Name` is used to uniquely identify the function declaration
// this is primarily used for merging function declaration with upstream counterparts
Name string `yaml:"name,omitempty" json:"name,omitempty"`

// `Selectors` are used to specify resources on which the function should be executed
// if not specified, all resources are selected
Selectors []Selector `yaml:"selectors,omitempty" json:"selectors,omitempty"`
Expand Down
109 changes: 108 additions & 1 deletion pkg/kptfile/kptfileutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/GoogleContainerTools/kpt/internal/types"
"github.com/GoogleContainerTools/kpt/internal/util/git"
kptfilev1 "github.com/GoogleContainerTools/kpt/pkg/api/kptfile/v1"
"sigs.k8s.io/kustomize/kyaml/sets"
"sigs.k8s.io/kustomize/kyaml/yaml"
"sigs.k8s.io/kustomize/kyaml/yaml/merge3"
)
Expand Down Expand Up @@ -223,7 +224,15 @@ func UpdateUpstreamLockFromGit(path string, spec *git.RepoSpec) error {
return nil
}

// merge merges the Kptfiles from various sources and updates localKf with output
// please refer to https://github.com/GoogleContainerTools/kpt/blob/main/docs/design-docs/03-pipeline-merge.md
// for related design
func merge(localKf, updatedKf, originalKf *kptfilev1.KptFile) error {
shouldAddSyntheticMergeName := shouldAddFnKey(localKf, updatedKf, originalKf)
if shouldAddSyntheticMergeName {
addNameForMerge(localKf, updatedKf, originalKf)
}

localBytes, err := yaml.Marshal(localKf)
if err != nil {
return err
Expand All @@ -239,7 +248,7 @@ func merge(localKf, updatedKf, originalKf *kptfilev1.KptFile) error {
return err
}

mergedBytes, err := merge3.MergeStrings(string(localBytes), string(originalBytes), string(updatedBytes), false)
mergedBytes, err := merge3.MergeStrings(string(localBytes), string(originalBytes), string(updatedBytes), true)
if err != nil {
return err
}
Expand All @@ -250,6 +259,10 @@ func merge(localKf, updatedKf, originalKf *kptfilev1.KptFile) error {
return err
}

if shouldAddSyntheticMergeName {
removeFnKey(localKf, updatedKf, originalKf, &mergedKf)
}

// Copy the merged content into the local Kptfile struct. We don't copy
// name, namespace, Upstream or UpstreamLock, since we don't want those
// merged.
Expand All @@ -261,6 +274,100 @@ func merge(localKf, updatedKf, originalKf *kptfilev1.KptFile) error {
return nil
}

// shouldAddFnKey returns true iff all the functions from all sources
// doesn't have name field set and there are no duplicate function declarations,
// it means the user is unaware of name field, and we use image name or exec field
// value as mergeKey instead of name in such cases
func shouldAddFnKey(kfs ...*kptfilev1.KptFile) bool {
for _, kf := range kfs {
if kf == nil || kf.Pipeline == nil {
continue
}
if !shouldAddFnKeyUtil(kf.Pipeline.Mutators) || !shouldAddFnKeyUtil(kf.Pipeline.Validators) {
return false
}
}
return true
}

// shouldAddFnKeyUtil returns true iff all the functions from input list
// doesn't have name field set and there are no duplicate function declarations,
// it means the user is unaware of name field, and we use image name or exec field
// value as mergeKey instead of name in such cases
func shouldAddFnKeyUtil(fns []kptfilev1.Function) bool {
keySet := sets.String{}
for _, fn := range fns {
if fn.Name != "" {
return false
}
var key string
if fn.Exec != "" {
key = fn.Exec
} else {
key = strings.Split(fn.Image, ":")[0]
}
if keySet.Has(key) {
return false
}
keySet.Insert(key)
}
return true
}

// addNameForMerge adds name field for all the functions if empty
// name is primarily used as merge-key
func addNameForMerge(kfs ...*kptfilev1.KptFile) {
for _, kf := range kfs {
if kf == nil || kf.Pipeline == nil {
continue
}
for i, mutator := range kf.Pipeline.Mutators {
kf.Pipeline.Mutators[i] = addName(mutator)
}
for i, validator := range kf.Pipeline.Validators {
kf.Pipeline.Validators[i] = addName(validator)
}
}
}

// addName adds name field to the input function if empty
// name is nothing but image name in this case as we use it as fall back mergeKey
func addName(fn kptfilev1.Function) kptfilev1.Function {
if fn.Name != "" {
return fn
}
var key string
if fn.Exec != "" {
key = fn.Exec
} else {
parts := strings.Split(fn.Image, ":")
if len(parts) > 0 {
key = parts[0]
}
}
fn.Name = fmt.Sprintf("_kpt-merge_%s", key)
return fn
}

// removeFnKey remove the synthesized function name field before writing
func removeFnKey(kfs ...*kptfilev1.KptFile) {
for _, kf := range kfs {
if kf == nil || kf.Pipeline == nil {
continue
}
for i := range kf.Pipeline.Mutators {
if strings.HasPrefix(kf.Pipeline.Mutators[i].Name, "_kpt-merge_") {
kf.Pipeline.Mutators[i].Name = ""
}
}
for i := range kf.Pipeline.Validators {
if strings.HasPrefix(kf.Pipeline.Validators[i].Name, "_kpt-merge_") {
kf.Pipeline.Validators[i].Name = ""
}
}
}
}

func updateUpstreamAndUpstreamLock(localKf, updatedKf *kptfilev1.KptFile) {
if updatedKf.Upstream != nil {
localKf.Upstream = updatedKf.Upstream
Expand Down
Loading

0 comments on commit 6be0da6

Please sign in to comment.