Skip to content
This repository has been archived by the owner on Jan 28, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1 from Azadehkhojandi/master
Browse files Browse the repository at this point in the history
initial commit
  • Loading branch information
Azadehkhojandi authored May 15, 2019
2 parents b8fab65 + 721ad4b commit 4f5bcb5
Show file tree
Hide file tree
Showing 95 changed files with 6,194 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

.vscode/
.env
68 changes: 67 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,71 @@
# Azure Databricks operator

# Contributing
## Introduction

Azure Databricks operator contains two projects. The golang application is a Kubernetes controller that watches CRDs that defines a Databricks job (input, output, functions, transformers, etc) and The Python Flask App sends commands to the Databricks.

![alt text](docs/images/azure-databricks-operator.jpg "high level architecture")

The project was built using

1. [Kubebuilder](https://book.kubebuilder.io/)
2. [Swagger Codegen](https://github.com/swagger-api/swagger-codegen)
3. [Flask-RESTPlus](http://flask-restplus.readthedocs.io)
4. [Flask](http://flask.pocoo.org/)

![alt text](docs/images/development-flow.jpg "development flow")

### Prerequisites And Assumptions

1. You have [Minikube](https://kubernetes.io/docs/tasks/tools/install-minikube/),[Kind](https://github.com/kubernetes-sigs/kind) or docker for desktop installed on your local computer with RBAC enabled.
2. You have a Kubernetes cluster running.
3. You have the kubectl command line (kubectl CLI) installed.
4. You have Helm and Tiller installed.

* Configure a Kubernetes cluster in your machine
> You need to make sure a kubeconfig file is configured.
> if you opt AKS, you can use: `az aks get-credentials --resource-group $RG_NAME --name $Cluster_NAME`
#### Basic commands to check your cluster

```shell
kubectl config get-contexts
kubectl cluster-info
kubectl version
kubectl get pods -n kube-system

```

#### Kubernetes on WSL

on windows command line run `kubectl config view` to find the values of [windows-user-name],[minikubeip],[port]

```shell
mkdir ~/.kube \
&& cp /mnt/c/Users/[windows-user-name]/.kube/config ~/.kube

kubectl config set-cluster minikube --server=https://<minikubeip>:<port> --certificate-authority=/mnt/c/Users/<windows-user-name>/.minikube/ca.crt
kubectl config set-credentials minikube --client-certificate=/mnt/c/Users/<windows-user-name>/.minikube/client.crt --client-key=/mnt/c/Users/<windows-user-name>/.minikube/client.key
kubectl config set-context minikube --cluster=minikube --user=minikub

```

More info:

https://devkimchi.com/2018/06/05/running-kubernetes-on-wsl/

https://www.jamessturtevant.com/posts/Running-Kubernetes-Minikube-on-Windows-10-with-WSL/

##
Docs and Samples are coming soon
## Main Contributors

1. [Jordan Knight](https://www.linkedin.com/in/jakkaj/)
2. [Paul Bouwer](https://www.linkedin.com/in/pbouwer/)
3. [Lace Lofranco](https://www.linkedin.com/in/lacelofranco/)
4. [Allan Targino](https://www.linkedin.com/in/allan-targino//)
5. [Azadeh Khojandi](https://www.linkedin.com/in/azadeh-khojandi-ba441b3/)

## Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a
Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us
Expand Down
1 change: 1 addition & 0 deletions databricks-api/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docs
4 changes: 4 additions & 0 deletions databricks-api/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.vscode/
.env
pip.conf
__pycache__/
19 changes: 19 additions & 0 deletions databricks-api/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
FROM continuumio/miniconda3:latest
LABEL maintainer="[email protected]"
ENV DATABRICKS_HOST ""
ENV DATABRICKS_TOKEN ""
RUN apt-get update && apt-get install gettext -y && apt-get clean
WORKDIR /tmp
COPY environment.yml ./
RUN conda env create -f environment.yml
RUN echo "source activate databricksapi" > ~/.bashrc
ENV PATH /opt/conda/envs/databricksapi/bin:$PATH
RUN /bin/bash -c "source activate databricksapi"
COPY requirements.txt ./
RUN pip install -r requirements.txt
COPY /app /app
WORKDIR /app
RUN rm -r /tmp
EXPOSE 5000
ENTRYPOINT ["bash","-c"]
CMD ["python app.py"]
251 changes: 251 additions & 0 deletions databricks-api/app/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
from flask import Flask
from flask_restplus import Resource, Api, fields
import logging
from databricks_api import DatabricksAPI
import json
import os
from dbricks_client import dbricks_client

app = Flask(__name__)

api = Api(app,
title='Databricks Controller',
version='1.0',
description='Manage Databricks')

runs_ns = api.namespace('api/jobs/runs',
description='Operations related to job management')


create_job_result = api.model('Create Job Result', {
'run_id': fields.Integer
})

create_job_status = api.model('Create Job Status', {
'run_name': fields.String(required=True,
example="aztest1-uppercase",
description='The name identifier of a job run'),
'result': fields.List(fields.Nested(create_job_result))
})

new_cluster = api.model('new_cluster', {
'spark_version': fields.String,
'spark_conf': fields.Raw,
'node_type_id': fields.String,
'spark_env_vars': fields.Raw,
'enable_elastic_disk': fields.Boolean,
'num_workers': fields.Integer
})


cluster_spec = api.model('cluster_spec', {
'new_cluster': fields.List(fields.Nested(new_cluster)),
'libraries': fields.List(fields.Raw),
})

notebook_task = api.model('notebook_task', {
'notebook_path': fields.String
})

run_definition = api.model('Run definition',
{
'run_name':
fields.String(required=True,
example="aztest1-uppercase",
description='The name identifier of a job run'),
'notebook_spec_secrets':
fields.Raw(required=False,
example={
"eventhub_source_cs": "Endpoint=sb://xxxx.servicebus.windows.net/;SharedAccessKeyName=xxxx;SharedAccessKey=xxxx=;EntityPath=sourceeh",
"eventhub_destination_cs": "Endpoint=sb://xxxx.servicebus.windows.net/;SharedAccessKeyName=xxxx;SharedAccessKey=xxxx=;EntityPath=desteh",
"adl2_destination_oauth2_clientid": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"adl2_destination_oauth2_clientsecret": "xxxx=",
"adl2_destination_oauth2_tenantid": "xxxx=",
"adl2_destination_cs": "abfss://<file-system-name>@<storage-account-name>.dfs.core.windows.net/folder1",
}),
'notebook_spec':
fields.Raw(required=False,
example={
'TryCount': 3,
'LogError': True
}),
'notebook_additional_libraries':
fields.Raw(required=False,
example=['t_uppercase',
't_lowercase',
't_append_a_to_attribute',
't_append_b_to_attribute']),
'new_cluster': fields.Nested(new_cluster),
'timeout_seconds': fields.Integer(required=False,
example=3600,
description='timeout in seconds'),
'notebook_task': fields.Nested(notebook_task)
}
)

state = api.model('state', {
'life_cycle_state': fields.String,
'result_state': fields.String,
'state_message': fields.String,
})

cluster_instance = api.model('cluster_instance', {
'cluster_id': fields.String})

run = api.model('Run', {
'job_id': fields.Integer,
'run_id': fields.Integer,
'number_in_job': fields.Integer,
'task': fields.Raw,
'cluster_spec': fields.List(fields.Nested(cluster_spec)),
'state': fields.List(fields.Nested(state)),
'cluster_instance': fields.List(fields.Nested(cluster_instance)),
'start_time': fields.Raw,
'setup_duration': fields.Integer,
'execution_duration': fields.Integer,
'cleanup_duration': fields.Integer,
'creator_user_name': fields.String,
'run_name': fields.String,
'run_page_url': fields.Url,
'run_type': fields.String,
})

list_runs_result = api.model('List Runs Result', {
'runs': fields.List(fields.Nested(run)),
'has_more': fields.Boolean
})

list_runs_status = api.model('List Runs Status', {
'result': fields.List(fields.Nested(list_runs_result))
})


@api.route('/status')
class Status(Resource):
def get(self):
return {'status': 'ok'}


@runs_ns.route('/')
class RunsList(Resource):
'''Shows a list of all runs, and lets you POST to submit new run'''
@runs_ns.doc('list_runs')
@runs_ns.marshal_with(list_runs_status)
def get(self):
'''List all runs'''
result = dbricks_client.list_runs(db=DATABRICKS_API)
return {'result': result}

@runs_ns.doc('submit run')
@runs_ns.expect(run_definition)
@runs_ns.marshal_with(create_job_status, code=201)
def post(self):
'''Create a new task'''

data = api.payload
if (data is None):
return {
"run_name": None,
"result": None
}

run_name = data['run_name']
notebook_spec_secrets = {}
if 'notebook_spec_secrets' in data:
notebook_spec_secrets = data['notebook_spec_secrets']

notebook_spec = {}
if 'notebook_spec' in data:
notebook_spec = data['notebook_spec']

if 'notebook_additional_libraries' in data:

notebook_additional_libraries = data['notebook_additional_libraries']

checkpoint_location = "dbfs:/checkpoints/" + run_name + "/output.txt"

# Read job config
with open(JOB_CONFIG_PATH) as job_config_file:
job_config = json.load(job_config_file)

# Set job config
job_config["run_name"] = run_name

# Add additional library to libraries
libraries = job_config["libraries"]

for lib in notebook_additional_libraries:
libraries.append({
lib['type']: lib['properties']
})

# All transformer packages require .transformer module w/ transform func
# transformer_names.append(transformer + ".transformer")

if "timeout_seconds" in data:
timeout_seconds = data["timeout_seconds"]
if (timeout_seconds is not None and type(timeout_seconds) is int):
job_config["timeout_seconds"] = timeout_seconds

if "new_cluster" in data:
new_cluster = data["new_cluster"]
for key in new_cluster:
value = new_cluster[key]
job_config["new_cluster"][key] = value

if "notebook_task" in data:
notebook_task = data["notebook_task"]
for key in notebook_task:
value = notebook_task[key]
job_config["notebook_task"][key] = value

result = dbricks_client.create_job(db=DATABRICKS_API,
run_name=run_name,
notebook_spec_secrets=notebook_spec_secrets,
notebook_spec=notebook_spec,
checkpoint_location=checkpoint_location,
job_config=job_config)
return {'run_name': run_name, 'result': result}


@runs_ns.response(404, 'run not found')
@runs_ns.param('run_id', 'The run identifier')
@runs_ns.route('/<int:run_id>')
class Run(Resource):
'''Show a single run job item and lets you delete it'''
@runs_ns.doc('get run')
# @jobs_ns.marshal_with(run_status)
def get(self, run_id):
print(run_id)
result = dbricks_client.get_run(db=DATABRICKS_API, run_id=run_id)
return {'result': result}

@runs_ns.doc('delete run')
# @runs_ns.response(204, 'run deleted')
def delete(self, run_id):
'''Delete a run given its identifier'''
print(run_id)
cancel_run_result, delete_run_result = dbricks_client.cancel_and_delete_run(
db=DATABRICKS_API, run_id=run_id)
return {'run_id': run_id, 'result': delete_run_result}


if __name__ == '__main__':
log_fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=log_fmt)

JOB_CONFIG_PATH = os.path.join(os.path.dirname(
__file__), "config", "job.config.json")

# Provide a host and token for connecting to DataBricks
DATABRICKS_HOST = os.getenv("DATABRICKS_HOST")
DATABRICKS_TOKEN = os.getenv("DATABRICKS_TOKEN")

PYPI_INDEX_URL = os.getenv("PYPI_INDEX_URL")

DATABRICKS_API = DatabricksAPI(
host=DATABRICKS_HOST,
token=DATABRICKS_TOKEN
)

app.run(host='0.0.0.0')
15 changes: 15 additions & 0 deletions databricks-api/app/config/job.config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"new_cluster": {
"spark_version": "5.2.x-scala2.11",
"node_type_id": "Standard_DS12_v2",
"num_workers": 1,
"spark_env_vars": {
"PYSPARK_PYTHON": "/databricks/python3/bin/python3"
}
},
"libraries": [],
"timeout_seconds": 3600,
"notebook_task": {
"notebook_path": ""
}
}
Loading

0 comments on commit 4f5bcb5

Please sign in to comment.