Skip to content

Commit

Permalink
Enable target overrides for pipeline clusters
Browse files Browse the repository at this point in the history
This is a follow up to #658 and #779 for jobs.

This change applies label normalization the same way the backend does.
  • Loading branch information
pietern committed Sep 21, 2023
1 parent 46996b8 commit c9c65b1
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 17 deletions.
21 changes: 10 additions & 11 deletions bundle/config/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,24 +131,23 @@ func (r *Resources) SetConfigFilePath(path string) {
}
}

// MergeJobClusters iterates over all jobs and merges their job clusters.
// This is called after applying the target overrides.
func (r *Resources) MergeJobClusters() error {
// Merge iterates over all resources and merges chunks of the
// resource configuration that can be merged. For example, for
// jobs, this merges job cluster definitions and tasks that
// use the same `job_cluster_key`, or `task_key`, respectively.
func (r *Resources) Merge() error {
for _, job := range r.Jobs {
if err := job.MergeJobClusters(); err != nil {
return err
}
}
return nil
}

// MergeTasks iterates over all jobs and merges their tasks.
// This is called after applying the target overrides.
func (r *Resources) MergeTasks() error {
for _, job := range r.Jobs {
if err := job.MergeTasks(); err != nil {
return err
}
}
for _, pipeline := range r.Pipelines {
if err := pipeline.MergeClusters(); err != nil {
return err
}
}
return nil
}
50 changes: 50 additions & 0 deletions bundle/config/resources/pipeline.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package resources

import (
"strings"

"github.com/databricks/cli/bundle/config/paths"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/imdario/mergo"
)

type Pipeline struct {
Expand All @@ -13,3 +16,50 @@ type Pipeline struct {

*pipelines.PipelineSpec
}

// MergeClusters merges cluster definitions with same label.
// The clusters field is a slice, and as such, overrides are appended to it.
// We can identify a cluster by its label, however, so we can use this label
// to figure out which definitions are actually overrides and merge them.
//
// Note: the cluster label is optional and defaults to 'default'.
// We therefore ALSO merge all clusters without a label.
func (p *Pipeline) MergeClusters() error {
clusters := make(map[string]*pipelines.PipelineCluster)
output := make([]pipelines.PipelineCluster, 0, len(p.Clusters))

// Normalize cluster labels.
// If empty, this defaults to "default".
// Matching is case insensitive, so labels are lowercased.
for i := range p.Clusters {
label := p.Clusters[i].Label
if label == "" {
label = "default"
}
p.Clusters[i].Label = strings.ToLower(label)
}

// Target overrides are always appended, so we can iterate in natural order to
// first find the base definition, and merge instances we encounter later.
for i := range p.Clusters {
label := p.Clusters[i].Label

// Register pipeline cluster with label if not yet seen before.
ref, ok := clusters[label]
if !ok {
output = append(output, p.Clusters[i])
clusters[label] = &output[len(output)-1]
continue
}

// Merge this instance into the reference.
err := mergo.Merge(ref, &p.Clusters[i], mergo.WithOverride, mergo.WithAppendSlice)
if err != nil {
return err
}
}

// Overwrite resulting slice.
p.Clusters = output
return nil
}
76 changes: 76 additions & 0 deletions bundle/config/resources/pipeline_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package resources

import (
"strings"
"testing"

"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestPipelineMergeClusters(t *testing.T) {
p := &Pipeline{
PipelineSpec: &pipelines.PipelineSpec{
Clusters: []pipelines.PipelineCluster{
{
NodeTypeId: "i3.xlarge",
NumWorkers: 2,
PolicyId: "1234",
},
{
Label: "maintenance",
NodeTypeId: "i3.2xlarge",
},
{
NodeTypeId: "i3.2xlarge",
NumWorkers: 4,
},
},
},
}

err := p.MergeClusters()
require.NoError(t, err)

assert.Len(t, p.Clusters, 2)
assert.Equal(t, "default", p.Clusters[0].Label)
assert.Equal(t, "maintenance", p.Clusters[1].Label)

// The default cluster was merged with a subsequent one.
pc0 := p.Clusters[0]
assert.Equal(t, "i3.2xlarge", pc0.NodeTypeId)
assert.Equal(t, 4, pc0.NumWorkers)
assert.Equal(t, "1234", pc0.PolicyId)

// The maintenance cluster was left untouched.
pc1 := p.Clusters[1]
assert.Equal(t, "i3.2xlarge", pc1.NodeTypeId)
}

func TestPipelineMergeClustersCaseInsensitive(t *testing.T) {
p := &Pipeline{
PipelineSpec: &pipelines.PipelineSpec{
Clusters: []pipelines.PipelineCluster{
{
Label: "default",
NumWorkers: 2,
},
{
Label: "DEFAULT",
NumWorkers: 4,
},
},
},
}

err := p.MergeClusters()
require.NoError(t, err)

assert.Len(t, p.Clusters, 1)

// The default cluster was merged with a subsequent one.
pc0 := p.Clusters[0]
assert.Equal(t, "default", strings.ToLower(pc0.Label))
assert.Equal(t, 4, pc0.NumWorkers)
}
7 changes: 1 addition & 6 deletions bundle/config/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,7 @@ func (r *Root) MergeTargetOverrides(target *Target) error {
return err
}

err = r.Resources.MergeJobClusters()
if err != nil {
return err
}

err = r.Resources.MergeTasks()
err = r.Resources.Merge()
if err != nil {
return err
}
Expand Down

0 comments on commit c9c65b1

Please sign in to comment.