Skip to content

Commit

Permalink
Adding support for using existing_cluster_name as well as existing_cl…
Browse files Browse the repository at this point in the history
…uster_id Azure#86
  • Loading branch information
Azadehkhojandi committed Nov 28, 2019
1 parent c28d567 commit a0f7f78
Show file tree
Hide file tree
Showing 15 changed files with 262 additions and 22 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@

Kubernetes offers the facility of extending its API through the concept of 'Operators' ([Introducing Operators: Putting Operational Knowledge into Software](https://coreos.com/blog/introducing-operators.html)). This repository contains the resources and code to deploy an Azure Databricks Operator for Kubernetes.

![alt text](docs/images/azure-databricks-operator-highlevel.jpg "high level architecture")

![alt text](docs/images/azure-databricks-operator.jpg "high level architecture")

The Databricks operator is useful in situations where Kubernetes hosted applications wish to launch and use Databricks data engineering and machine learning tasks.

![alt text](docs/images/azure-databricks-operator.jpg "high level architecture")


The project was built using

1. [Kubebuilder](https://book.kubebuilder.io/)
Expand Down
1 change: 0 additions & 1 deletion api/v1alpha1/dcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ type DclusterStatus struct {
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
// +kubebuilder:printcolumn:name="ClusterID",type="string",JSONPath=".status.cluster_info.cluster_id"
// +kubebuilder:printcolumn:name="State",type="string",JSONPath=".status.cluster_info.state"
// +kubebuilder:printcolumn:name="NumWorkers",type="integer",JSONPath=".status.cluster_info.num_workers"
type Dcluster struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Expand Down
4 changes: 2 additions & 2 deletions api/v1alpha1/djob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ type Djob struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec *dbmodels.JobSettings `json:"spec,omitempty"`
Status *DjobStatus `json:"status,omitempty"`
Spec *JobSettings `json:"spec,omitempty"`
Status *DjobStatus `json:"status,omitempty"`
}

// IsBeingDeleted returns true if a deletion timestamp is set
Expand Down
88 changes: 88 additions & 0 deletions api/v1alpha1/djob_types_extra.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
Copyright 2019 microsoft.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
dbmodels "github.com/xinsnake/databricks-sdk-golang/azure/models"
)

// JobSettings is similar to dbmodels.JobSettings, the reason it
// exists is because dbmodels.JobSettings doesn't support ExistingClusterName
// ExistingClusterName allows discovering databricks clusters by it's kubernetese object name
type JobSettings struct {
ExistingClusterID string `json:"existing_cluster_id,omitempty" url:"existing_cluster_id,omitempty"`
ExistingClusterName string `json:"existing_cluster_name,omitempty" url:"existing_cluster_name,omitempty"`
NewCluster *dbmodels.NewCluster `json:"new_cluster,omitempty" url:"new_cluster,omitempty"`
NotebookTask *dbmodels.NotebookTask `json:"notebook_task,omitempty" url:"notebook_task,omitempty"`
SparkJarTask *dbmodels.SparkJarTask `json:"spark_jar_task,omitempty" url:"spark_jar_task,omitempty"`
SparkPythonTask *dbmodels.SparkPythonTask `json:"spark_python_task,omitempty" url:"spark_python_task,omitempty"`
SparkSubmitTask *dbmodels.SparkSubmitTask `json:"spark_submit_task,omitempty" url:"spark_submit_task,omitempty"`
Name string `json:"name,omitempty" url:"name,omitempty"`
Libraries []dbmodels.Library `json:"libraries,omitempty" url:"libraries,omitempty"`
EmailNotifications *dbmodels.JobEmailNotifications `json:"email_notifications,omitempty" url:"email_notifications,omitempty"`
TimeoutSeconds int32 `json:"timeout_seconds,omitempty" url:"timeout_seconds,omitempty"`
MaxRetries int32 `json:"max_retries,omitempty" url:"max_retries,omitempty"`
MinRetryIntervalMillis int32 `json:"min_retry_interval_millis,omitempty" url:"min_retry_interval_millis,omitempty"`
RetryOnTimeout bool `json:"retry_on_timeout,omitempty" url:"retry_on_timeout,omitempty"`
Schedule *dbmodels.CronSchedule `json:"schedule,omitempty" url:"schedule,omitempty"`
MaxConcurrentRuns int32 `json:"max_concurrent_runs,omitempty" url:"max_concurrent_runs,omitempty"`
}

// ToK8sJobSettings converts a databricks JobSettings object to k8s JobSettings object.
// It is needed to add ExistingClusterName and follow k8s camleCase naming convention
func ToK8sJobSettings(dbjs *dbmodels.JobSettings) JobSettings {
var k8sjs JobSettings
k8sjs.ExistingClusterID = dbjs.ExistingClusterID
k8sjs.NewCluster = dbjs.NewCluster
k8sjs.NotebookTask = dbjs.NotebookTask
k8sjs.SparkJarTask = dbjs.SparkJarTask
k8sjs.SparkPythonTask = dbjs.SparkPythonTask
k8sjs.SparkSubmitTask = dbjs.SparkSubmitTask
k8sjs.Name = dbjs.Name
k8sjs.Libraries = dbjs.Libraries
k8sjs.EmailNotifications = dbjs.EmailNotifications
k8sjs.TimeoutSeconds = dbjs.TimeoutSeconds
k8sjs.MaxRetries = dbjs.MaxRetries
k8sjs.MinRetryIntervalMillis = dbjs.MinRetryIntervalMillis
k8sjs.RetryOnTimeout = dbjs.RetryOnTimeout
k8sjs.Schedule = dbjs.Schedule
k8sjs.MaxConcurrentRuns = dbjs.MaxConcurrentRuns
return k8sjs
}

// ToDatabricksJobSettings converts a k8s JobSettings object to a DataBricks JobSettings object.
// It is needed to add ExistingClusterName and follow k8s camleCase naming convention
func ToDatabricksJobSettings(k8sjs *JobSettings) dbmodels.JobSettings {

var dbjs dbmodels.JobSettings
dbjs.ExistingClusterID = k8sjs.ExistingClusterID
dbjs.NewCluster = k8sjs.NewCluster
dbjs.NotebookTask = k8sjs.NotebookTask
dbjs.SparkJarTask = k8sjs.SparkJarTask
dbjs.SparkPythonTask = k8sjs.SparkPythonTask
dbjs.SparkSubmitTask = k8sjs.SparkSubmitTask
dbjs.Name = k8sjs.Name
dbjs.Libraries = k8sjs.Libraries
dbjs.EmailNotifications = k8sjs.EmailNotifications
dbjs.TimeoutSeconds = k8sjs.TimeoutSeconds
dbjs.MaxRetries = k8sjs.MaxRetries
dbjs.MinRetryIntervalMillis = k8sjs.MinRetryIntervalMillis
dbjs.RetryOnTimeout = k8sjs.RetryOnTimeout
dbjs.Schedule = k8sjs.Schedule
dbjs.MaxConcurrentRuns = k8sjs.MaxConcurrentRuns
return dbjs
}
59 changes: 58 additions & 1 deletion api/v1alpha1/zz_generated.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (in *Djob) DeepCopyInto(out *Djob) {
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
if in.Spec != nil {
in, out := &in.Spec, &out.Spec
*out = new(models.JobSettings)
*out = new(JobSettings)
(*in).DeepCopyInto(*out)
}
if in.Status != nil {
Expand Down Expand Up @@ -385,6 +385,63 @@ func (in *DjobStatus) DeepCopy() *DjobStatus {
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *JobSettings) DeepCopyInto(out *JobSettings) {
*out = *in
if in.NewCluster != nil {
in, out := &in.NewCluster, &out.NewCluster
*out = new(models.NewCluster)
(*in).DeepCopyInto(*out)
}
if in.NotebookTask != nil {
in, out := &in.NotebookTask, &out.NotebookTask
*out = new(models.NotebookTask)
(*in).DeepCopyInto(*out)
}
if in.SparkJarTask != nil {
in, out := &in.SparkJarTask, &out.SparkJarTask
*out = new(models.SparkJarTask)
(*in).DeepCopyInto(*out)
}
if in.SparkPythonTask != nil {
in, out := &in.SparkPythonTask, &out.SparkPythonTask
*out = new(models.SparkPythonTask)
(*in).DeepCopyInto(*out)
}
if in.SparkSubmitTask != nil {
in, out := &in.SparkSubmitTask, &out.SparkSubmitTask
*out = new(models.SparkSubmitTask)
(*in).DeepCopyInto(*out)
}
if in.Libraries != nil {
in, out := &in.Libraries, &out.Libraries
*out = make([]models.Library, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.EmailNotifications != nil {
in, out := &in.EmailNotifications, &out.EmailNotifications
*out = new(models.JobEmailNotifications)
(*in).DeepCopyInto(*out)
}
if in.Schedule != nil {
in, out := &in.Schedule, &out.Schedule
*out = new(models.CronSchedule)
**out = **in
}
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobSettings.
func (in *JobSettings) DeepCopy() *JobSettings {
if in == nil {
return nil
}
out := new(JobSettings)
in.DeepCopyInto(out)
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Run) DeepCopyInto(out *Run) {
*out = *in
Expand Down
3 changes: 0 additions & 3 deletions config/crd/bases/databricks.microsoft.com_dclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ spec:
- JSONPath: .status.cluster_info.state
name: State
type: string
- JSONPath: .status.cluster_info.num_workers
name: NumWorkers
type: integer
group: databricks.microsoft.com
names:
kind: Dcluster
Expand Down
6 changes: 6 additions & 0 deletions config/crd/bases/databricks.microsoft.com_djobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ spec:
metadata:
type: object
spec:
description: JobSettings is similar to dbmodels.JobSettings, the reason
it exists is because dbmodels.JobSettings doesn't support ExistingClusterName
ExistingClusterName allows discovering databricks clusters by it's kubernetese
object name
properties:
email_notifications:
properties:
Expand All @@ -56,6 +60,8 @@ spec:
type: object
existing_cluster_id:
type: string
existing_cluster_name:
type: string
libraries:
items:
properties:
Expand Down
2 changes: 1 addition & 1 deletion config/samples/databricks_v1alpha1_dcluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ spec:
node_type_id: Standard_D3_v2
autoscale:
min_workers: 2
max_workers: 5
max_workers: 3
20 changes: 18 additions & 2 deletions controllers/dcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import (
"time"

"github.com/go-logr/logr"
databricksv1alpha1 "github.com/microsoft/azure-databricks-operator/api/v1alpha1"
dbazure "github.com/xinsnake/databricks-sdk-golang/azure"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

databricksv1alpha1 "github.com/microsoft/azure-databricks-operator/api/v1alpha1"
)

// DclusterReconciler reconciles a Dcluster object
Expand Down Expand Up @@ -103,8 +103,24 @@ func (r *DclusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}

const dclusterIndexKey = ".status.cluster_info.cluster_id"

// SetupWithManager adds the controller manager
func (r *DclusterReconciler) SetupWithManager(mgr ctrl.Manager) error {

if err := mgr.GetFieldIndexer().IndexField(&databricksv1alpha1.Dcluster{}, dclusterIndexKey, func(rawObj runtime.Object) []string {
dcluster := rawObj.(*databricksv1alpha1.Dcluster)
if dcluster == nil {
return nil
}
if dcluster.Status == nil || dcluster.Status.ClusterInfo == nil {
return nil
}
return []string{dcluster.Status.ClusterInfo.ClusterID}
}); err != nil {
return err
}

return ctrl.NewControllerManagedBy(mgr).
For(&databricksv1alpha1.Dcluster{}).
Complete(r)
Expand Down
51 changes: 44 additions & 7 deletions controllers/djob_controller_databricks.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,60 @@ package controllers
import (
"context"
"fmt"
"reflect"
"strings"

databricksv1alpha1 "github.com/microsoft/azure-databricks-operator/api/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/client"
"strings"
)

func (r *DjobReconciler) submit(instance *databricksv1alpha1.Djob) error {
r.Log.Info(fmt.Sprintf("Submitting job %s", instance.GetName()))

instance.Spec.Name = instance.GetName()

job, err := r.APIClient.Jobs().Create(*instance.Spec)
//Get exisiting dbricks cluster by cluster name and set ExistingClusterID or
//Get exisiting dbricks cluster by cluster id
var ownerInstance databricksv1alpha1.Dcluster
if len(instance.Spec.ExistingClusterName) > 0 {
dClusterNamespacedName := types.NamespacedName{Name: instance.Spec.ExistingClusterName, Namespace: instance.Namespace}
err := r.Get(context.Background(), dClusterNamespacedName, &ownerInstance)
if err != nil {
return err
}
if (ownerInstance.Status != nil) && (ownerInstance.Status.ClusterInfo != nil) && len(ownerInstance.Status.ClusterInfo.ClusterID) > 0 {
instance.Spec.ExistingClusterID = ownerInstance.Status.ClusterInfo.ClusterID
} else {
return fmt.Errorf("failed to get ClusterID of %v", instance.Spec.ExistingClusterName)
}
} else if len(instance.Spec.ExistingClusterID) > 0 {
var dclusters databricksv1alpha1.DclusterList
err := r.List(context.Background(), &dclusters, client.InNamespace(instance.Namespace), client.MatchingField(dclusterIndexKey, instance.Spec.ExistingClusterID))
if err != nil {
return err
}
if len(dclusters.Items) == 1 {
ownerInstance = dclusters.Items[0]
} else {
return fmt.Errorf("failed to get ClusterID of %v", instance.Spec.ExistingClusterID)
}
}
//Set Exisiting cluster as Owner of JOb
if &ownerInstance != nil && len(ownerInstance.APIVersion) > 0 && len(ownerInstance.Kind) > 0 && len(ownerInstance.GetName()) > 0 {
references := []metav1.OwnerReference{
{
APIVersion: ownerInstance.APIVersion,
Kind: ownerInstance.Kind,
Name: ownerInstance.GetName(),
UID: ownerInstance.GetUID(),
},
}
instance.ObjectMeta.SetOwnerReferences(references)
}
jobSettings := databricksv1alpha1.ToDatabricksJobSettings(instance.Spec)
job, err := r.APIClient.Jobs().Create(jobSettings)
if err != nil {
return err
}

instance.Spec.Name = instance.GetName()
instance.Status = &databricksv1alpha1.DjobStatus{
JobStatus: &job,
Expand Down
6 changes: 3 additions & 3 deletions controllers/djob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ var _ = Describe("Djob Controller", func() {
Namespace: "default",
}

spec := &dbmodels.JobSettings{
spec := databricksv1alpha1.JobSettings{
NewCluster: &dbmodels.NewCluster{
SparkVersion: "5.3.x-scala2.11",
NodeTypeID: "Standard_D3_v2",
NumWorkers: 10,
NumWorkers: 2,
},
Libraries: []dbmodels.Library{
{
Expand Down Expand Up @@ -86,7 +86,7 @@ var _ = Describe("Djob Controller", func() {
Name: key.Name,
Namespace: key.Namespace,
},
Spec: spec,
Spec: &spec,
}

// Create
Expand Down
2 changes: 1 addition & 1 deletion controllers/run_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var _ = Describe("Run Controller", func() {
Namespace: "default",
}

jobSpec := &dbmodels.JobSettings{
jobSpec := &databricksv1alpha1.JobSettings{
NewCluster: &dbmodels.NewCluster{
SparkVersion: "5.3.x-scala2.11",
NodeTypeID: "Standard_D3_v2",
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: databricks.microsoft.com/v1alpha1
kind: Djob
metadata:
name: djob-basic-with-cluster-name
spec:
# This spec is directly linked to the JobSettings structure
# https://docs.databricks.com/api/latest/jobs.html#jobsettings
existing_cluster_name: dcluster-interactive1
timeout_seconds: 3600
max_retries: 1
schedule:
quartz_cron_expression: 0 0/1 * * * ?
timezone_id: America/Los_Angeles
notebook_task:
base_parameters:
"name": "Azadeh"
notebook_path: "/samples/basic1"
Loading

0 comments on commit a0f7f78

Please sign in to comment.