Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JobTaskClusterSpec validate mutator #1784

Merged
merged 6 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions bundle/config/validate/job_task_cluster_spec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package validate

import (
"context"
"fmt"
"strings"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/databricks-sdk-go/service/jobs"
)

// JobTaskClusterSpec validates that job tasks have cluster spec defined
// if task requires a cluster
func JobTaskClusterSpec() bundle.ReadOnlyMutator {
return &jobTaskClusterSpec{}
}

type jobTaskClusterSpec struct {
}

func (v *jobTaskClusterSpec) Name() string {
return "validate:job_task_cluster_spec"
}

func (v *jobTaskClusterSpec) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
diags := diag.Diagnostics{}

jobsPath := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"))

for resourceName, job := range rb.Config().Resources.Jobs {
resourcePath := jobsPath.Append(dyn.Key(resourceName))

for taskIndex, task := range job.Tasks {
taskPath := resourcePath.Append(dyn.Key("tasks"), dyn.Index(taskIndex))

diags = diags.Extend(validateJobTask(rb, task, taskPath))
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mixes access to the typed configuration with dyn.Value access.

Any chance we can just use the dyn.Value one?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change it. I think it's easier to implement and test validations using typed configuration though. The only problematic part is constructing path to a property.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I was thinking is you can use dyn.MapByPattern to find all tasks across all jobs (and you get the right base path), and then use convert.ToTyped to get the typed jobs.Task value.


return diags
}

func validateJobTask(rb bundle.ReadOnlyBundle, task jobs.Task, taskPath dyn.Path) diag.Diagnostics {
diags := diag.Diagnostics{}

var specified []string
var unspecified []string

if task.JobClusterKey != "" {
specified = append(specified, "job_cluster_key")
} else {
unspecified = append(unspecified, "job_cluster_key")
}

if task.EnvironmentKey != "" {
specified = append(specified, "environment_key")
} else {
unspecified = append(unspecified, "environment_key")
}

if task.ExistingClusterId != "" {
specified = append(specified, "existing_cluster_id")
} else {
unspecified = append(unspecified, "existing_cluster_id")
}

if task.NewCluster != nil {
specified = append(specified, "new_cluster")
} else {
unspecified = append(unspecified, "new_cluster")
}

if task.ForEachTask != nil {
forEachTaskPath := taskPath.Append(dyn.Key("for_each_task"), dyn.Key("task"))

diags = diags.Extend(validateJobTask(rb, task.ForEachTask.Task, forEachTaskPath))
}

if isComputeTask(task) && len(specified) == 0 {
if task.NotebookTask != nil {
// notebook tasks without cluster spec will use notebook environment
} else {
// path might be not very helpful, adding user-specified task key clarifies the context
detail := fmt.Sprintf(
"Task %q requires a cluster or an environment to run. Specify one of the following fields: %s",
kanterov marked this conversation as resolved.
Show resolved Hide resolved
task.TaskKey,
strings.Join(unspecified, ", "),
)

diags = diags.Append(diag.Diagnostic{
Severity: diag.Error,
Summary: "Missing required cluster or environment settings",
Detail: detail,
Locations: rb.Config().GetLocations(taskPath.String()),
Paths: []dyn.Path{taskPath},
})
}
}

return diags
}

// isComputeTask returns true if the task runs on a cluster or serverless GC
func isComputeTask(task jobs.Task) bool {
if task.NotebookTask != nil {
// if warehouse_id is set, it's SQL notebook that doesn't need cluster or serverless GC
if task.NotebookTask.WarehouseId != "" {
return false
} else {
// task settings don't require specifying a cluster/serverless GC, but task itself can run on one
// we handle that case separately in validateJobTask
return true
pietern marked this conversation as resolved.
Show resolved Hide resolved
}
}

if task.PythonWheelTask != nil {
return true
}

if task.DbtTask != nil {
return true
}

if task.SparkJarTask != nil {
return true
}

if task.SparkSubmitTask != nil {
return true
}

if task.SparkPythonTask != nil {
return true
}

if task.SqlTask != nil {
return false
}

if task.PipelineTask != nil {
// while pipelines use clusters, pipeline tasks don't, they only trigger pipelines
return false
}

if task.RunJobTask != nil {
return false
}

if task.ConditionTask != nil {
return false
}

// for each task doesn't use clusters, underlying task(s) can though
if task.ForEachTask != nil {
return false
}

return false
}
203 changes: 203 additions & 0 deletions bundle/config/validate/job_task_cluster_spec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package validate

import (
"context"
"testing"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/assert"
)

func TestJobTaskClusterSpec(t *testing.T) {
expectedSummary := "Missing required cluster or environment settings"

type testCase struct {
name string
task jobs.Task
errorPath string
errorDetail string
errorSummary string
}

testCases := []testCase{
{
name: "valid notebook task",
task: jobs.Task{
// while a cluster is needed, it will use notebook environment to create one
NotebookTask: &jobs.NotebookTask{},
},
},
{
name: "valid notebook task (job_cluster_key)",
task: jobs.Task{
JobClusterKey: "cluster1",
NotebookTask: &jobs.NotebookTask{},
},
},
{
name: "valid notebook task (new_cluster)",
task: jobs.Task{
NewCluster: &compute.ClusterSpec{},
NotebookTask: &jobs.NotebookTask{},
},
},
{
name: "valid notebook task (existing_cluster_id)",
task: jobs.Task{
ExistingClusterId: "cluster1",
NotebookTask: &jobs.NotebookTask{},
},
},
{
name: "valid SQL notebook task",
task: jobs.Task{
NotebookTask: &jobs.NotebookTask{
WarehouseId: "warehouse1",
},
},
},
{
name: "valid python wheel task",
task: jobs.Task{
JobClusterKey: "cluster1",
PythonWheelTask: &jobs.PythonWheelTask{},
},
},
{
name: "valid python wheel task (environment_key)",
task: jobs.Task{
EnvironmentKey: "environment1",
PythonWheelTask: &jobs.PythonWheelTask{},
},
},
{
name: "valid dbt task",
task: jobs.Task{
JobClusterKey: "cluster1",
DbtTask: &jobs.DbtTask{},
},
},
{
name: "valid spark jar task",
task: jobs.Task{
JobClusterKey: "cluster1",
SparkJarTask: &jobs.SparkJarTask{},
},
},
{
name: "valid spark submit",
task: jobs.Task{
NewCluster: &compute.ClusterSpec{},
SparkSubmitTask: &jobs.SparkSubmitTask{},
},
},
{
name: "valid spark python task",
task: jobs.Task{
JobClusterKey: "cluster1",
SparkPythonTask: &jobs.SparkPythonTask{},
},
},
{
name: "valid SQL task",
task: jobs.Task{
SqlTask: &jobs.SqlTask{},
},
},
{
name: "valid pipeline task",
task: jobs.Task{
PipelineTask: &jobs.PipelineTask{},
},
},
{
name: "valid run job task",
task: jobs.Task{
RunJobTask: &jobs.RunJobTask{},
},
},
{
name: "valid condition task",
task: jobs.Task{
ConditionTask: &jobs.ConditionTask{},
},
},
{
name: "valid for each task",
task: jobs.Task{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
JobClusterKey: "cluster1",
NotebookTask: &jobs.NotebookTask{},
},
},
},
},
{
name: "invalid python wheel task",
task: jobs.Task{
PythonWheelTask: &jobs.PythonWheelTask{},
TaskKey: "my_task",
},
errorPath: "resources.jobs.job1.tasks[0]",
errorDetail: "Task \"my_task\" requires a cluster or an environment to run. Specify one of the " +
"following fields: job_cluster_key, environment_key, existing_cluster_id, new_cluster",
errorSummary: expectedSummary,
},
{
name: "invalid for each task",
task: jobs.Task{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
PythonWheelTask: &jobs.PythonWheelTask{},
TaskKey: "my_task",
},
},
},
errorPath: "resources.jobs.job1.tasks[0].for_each_task.task",
errorDetail: "Task \"my_task\" requires a cluster or an environment to run. Specify one of the " +
"following fields: job_cluster_key, environment_key, existing_cluster_id, new_cluster",
errorSummary: expectedSummary,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
job := &resources.Job{
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{tc.task},
},
}

b := createBundle(map[string]*resources.Job{"job1": job})
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), JobTaskClusterSpec())

if tc.errorPath != "" || tc.errorDetail != "" || tc.errorSummary != "" {
assert.Len(t, diags, 1)
assert.Len(t, diags[0].Paths, 1)

diag := diags[0]

assert.Equal(t, tc.errorPath, diag.Paths[0].String())
assert.Equal(t, tc.errorSummary, diag.Summary)
assert.Equal(t, tc.errorDetail, diag.Detail)
} else {
assert.ElementsMatch(t, []string{}, diags)
}
})
}
}

func createBundle(jobs map[string]*resources.Job) *bundle.Bundle {
return &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: jobs,
},
},
}
}
1 change: 1 addition & 0 deletions bundle/config/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics
JobClusterKeyDefined(),
FilesToSync(),
ValidateSyncPatterns(),
JobTaskClusterSpec(),
))
}

Expand Down