diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..faa1fb9 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,17 @@ +[run] +branch = True +source = cicd_sample_project + +[report] +exclude_lines = + if self.debug: + pragma: no cover + raise NotImplementedError + if __name__ == .__main__.: + +ignore_errors = True +omit = + tests/* + setup.py + # this file is autogenerated by dbx + cicd_sample_project/common.py diff --git a/.dbx/project.json b/.dbx/project.json new file mode 100644 index 0000000..e8ff4ba --- /dev/null +++ b/.dbx/project.json @@ -0,0 +1,19 @@ +{ + "environments": { + "default": { + "profile": "dev", + "workspace_dir": "/Shared/dbx/projects/dbx-example-project", + "artifact_location": "dbfs:/dbx/dbx-example-project" + }, + "dev": { + "profile": "dev", + "workspace_dir": "/Shared/dbx/projects/dbx-example-project", + "artifact_location": "dbfs:/dbx/dbx-example-project" + }, + "staging": { + "profile": "staging", + "workspace_dir": "/Shared/dbx/projects/dbx-example-project", + "artifact_location": "dbfs:/dbx/dbx-example-project" + } + } +} \ No newline at end of file diff --git a/.github/workflows/onpush.yml b/.github/workflows/onpush.yml new file mode 100644 index 0000000..373a2a5 --- /dev/null +++ b/.github/workflows/onpush.yml @@ -0,0 +1,51 @@ +name: CI pipeline + +on: + push: + branches: + - '**' + tags-ignore: + - 'v*' # this tag type is used for release pipelines + +jobs: + ci-pipeline: + + runs-on: ubuntu-latest + strategy: + max-parallel: 4 + + env: + DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }} + DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }} + + steps: + - uses: actions/checkout@v1 + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: 3.7.5 + + - name: Install pip + run: | + python -m pip install --upgrade pip + + - name: Install dependencies and project in dev mode + run: | + pip install -r unit-requirements.txt + pip install -e . + + - name: Run unit tests + run: | + echo "Launching unit tests" + pytest tests/unit + + - name: Deploy integration test + run: | + dbx deploy --jobs=cicd-sample-project-sample-integration-test --files-only + + - name: Run integration test + run: | + dbx launch --job=cicd-sample-project-sample-integration-test --as-run-submit --trace + + diff --git a/.github/workflows/onrelease.yml b/.github/workflows/onrelease.yml new file mode 100644 index 0000000..b0814cc --- /dev/null +++ b/.github/workflows/onrelease.yml @@ -0,0 +1,54 @@ +name: Release pipeline + +on: + push: + tags: + - 'v*' # Push events to matching v*, i.e. v1.0, v20.15.10 + + +jobs: + release-pipeline: + + runs-on: ubuntu-latest + strategy: + max-parallel: 4 + matrix: + python-version: [ 3.7 ] + + env: + DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }} + DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }} + + steps: + - uses: actions/checkout@v1 + + - name: Set up Python + uses: actions/setup-python@v1 + with: + python-version: 3.7 + + - name: Install pip + run: | + python -m pip install --upgrade pip + + - name: Install dependencies and project in dev mode + run: | + pip install -r unit-requirements.txt + + - name: Deploy the job + run: | + dbx deploy --jobs=cicd-sample-project-sample + + - name: Create Release + id: create_release + uses: actions/create-release@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # This token is provided by Actions + with: + tag_name: ${{ github.ref }} + release_name: Release ${{ github.ref }} + body: | + Release for version ${{ github.ref }}. + draft: false + prerelease: false + diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f8cdf75 --- /dev/null +++ b/.gitignore @@ -0,0 +1,34 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# Distribution / packaging +*.egg-info/ +build +dist + +# Unit test / coverage reports +.coverage +coverage.xml +junit/* +htmlcov/* + +# Caches +.pytest_cache/ + +# VSCode +.vscode/ + +# Idea +.idea/ +*.iml + +# MacOS +.DS_Store + +# Databricks eXtensions +.dbx/lock.json + +# local mlflow files +mlruns/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..e1f0d9d --- /dev/null +++ b/README.md @@ -0,0 +1,100 @@ +# cicd-sample-project + +This is a sample project for Databricks, generated via cookiecutter. + +While using this project, you need Python 3.X and `pip` or `conda` for package management. + +## Installing project requirements + +```bash +pip install -r unit-requirements.txt +``` + +## Install project package in a developer mode + +```bash +pip install -e . +``` + +## Testing + +For local unit testing, please use `pytest`: +``` +pytest tests/unit --cov +``` + +For an integration test on interactive cluster, use the following command: +``` +dbx execute --cluster-name= --job=cicd-sample-project-sample-integration-test +``` + +For a test on an automated job cluster, deploy the job files and then launch: +``` +dbx deploy --jobs=cicd-sample-project-sample-integration-test --files-only +dbx launch --job=cicd-sample-project-sample-integration-test --as-run-submit --trace +``` + +## Interactive execution and development + +1. `dbx` expects that cluster for interactive execution supports `%pip` and `%conda` magic [commands](https://docs.databricks.com/libraries/notebooks-python-libraries.html). +2. Please configure your job in `conf/deployment.yml` file. +2. To execute the code interactively, provide either `--cluster-id` or `--cluster-name`. +```bash +dbx execute \ + --cluster-name="" \ + --job=job-name +``` + +Multiple users also can use the same cluster for development. Libraries will be isolated per each execution context. + +## Preparing deployment file + +Next step would be to configure your deployment objects. To make this process easy and flexible, we're using YAML for configuration. + +By default, deployment configuration is stored in `conf/deployment.yml`. + +## Deployment for Run Submit API + +To deploy only the files and not to override the job definitions, do the following: + +```bash +dbx deploy --files-only +``` + +To launch the file-based deployment: +``` +dbx launch --as-run-submit --trace +``` + +This type of deployment is handy for working in different branches, not to affect the main job definition. + +## Deployment for Run Now API + +To deploy files and update the job definitions: + +```bash +dbx deploy +``` + +To launch the file-based deployment: +``` +dbx launch --job= +``` + +This type of deployment shall be mainly used from the CI pipeline in automated way during new release. + + +## CICD pipeline settings + +Please set the following secrets or environment variables for your CI provider: +- `DATABRICKS_HOST` +- `DATABRICKS_TOKEN` + +## Testing and releasing via CI pipeline + +- To trigger the CI pipeline, simply push your code to the repository. If CI provider is correctly set, it shall trigger the general testing pipeline +- To trigger the release pipeline, get the current version from the `cicd_sample_project/__init__.py` file and tag the current code version: +``` +git tag -a v -m "Release tag for version " +git push origin --tags +``` diff --git a/cicd_sample_project/__init__.py b/cicd_sample_project/__init__.py new file mode 100644 index 0000000..f102a9c --- /dev/null +++ b/cicd_sample_project/__init__.py @@ -0,0 +1 @@ +__version__ = "0.0.1" diff --git a/cicd_sample_project/common.py b/cicd_sample_project/common.py new file mode 100644 index 0000000..17f13a9 --- /dev/null +++ b/cicd_sample_project/common.py @@ -0,0 +1,94 @@ +from abc import ABC, abstractmethod +from argparse import ArgumentParser +from logging import Logger +from typing import Dict, Any +import yaml +import pathlib +from pyspark.sql import SparkSession +import sys + + +# abstract class for jobs +class Job(ABC): + def __init__(self, spark=None, init_conf=None): + self.spark = self._prepare_spark(spark) + self.logger = self._prepare_logger() + self.dbutils = self.get_dbutils() + if init_conf: + self.conf = init_conf + else: + self.conf = self._provide_config() + self._log_conf() + + @staticmethod + def _prepare_spark(spark) -> SparkSession: + if not spark: + return SparkSession.builder.getOrCreate() + else: + return spark + + @staticmethod + def _get_dbutils(spark: SparkSession): + try: + from pyspark.dbutils import DBUtils # noqa + + if "dbutils" not in locals(): + utils = DBUtils(spark) + return utils + else: + return locals().get("dbutils") + except ImportError: + return None + + def get_dbutils(self): + utils = self._get_dbutils(self.spark) + + if not utils: + self.logger.warn("No DBUtils defined in the runtime") + else: + self.logger.info("DBUtils class initialized") + + return utils + + def _provide_config(self): + self.logger.info("Reading configuration from --conf-file job option") + conf_file = self._get_conf_file() + if not conf_file: + self.logger.info( + "No conf file was provided, setting configuration to empty dict." + "Please override configuration in subclass init method" + ) + return {} + else: + self.logger.info(f"Conf file was provided, reading configuration from {conf_file}") + return self._read_config(conf_file) + + @staticmethod + def _get_conf_file(): + p = ArgumentParser() + p.add_argument("--conf-file", required=False, type=str) + namespace = p.parse_known_args(sys.argv[1:])[0] + return namespace.conf_file + + @staticmethod + def _read_config(conf_file) -> Dict[str, Any]: + config = yaml.safe_load(pathlib.Path(conf_file).read_text()) + return config + + def _prepare_logger(self) -> Logger: + log4j_logger = self.spark._jvm.org.apache.log4j # noqa + return log4j_logger.LogManager.getLogger(self.__class__.__name__) + + def _log_conf(self): + # log parameters + self.logger.info("Launching job with configuration parameters:") + for key, item in self.conf.items(): + self.logger.info("\t Parameter: %-30s with value => %-30s" % (key, item)) + + @abstractmethod + def launch(self): + """ + Main method of the job. + :return: + """ + pass diff --git a/cicd_sample_project/jobs/__init__.py b/cicd_sample_project/jobs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cicd_sample_project/jobs/sample/__init__.py b/cicd_sample_project/jobs/sample/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cicd_sample_project/jobs/sample/entrypoint.py b/cicd_sample_project/jobs/sample/entrypoint.py new file mode 100644 index 0000000..da097f6 --- /dev/null +++ b/cicd_sample_project/jobs/sample/entrypoint.py @@ -0,0 +1,25 @@ +from cicd_sample_project.common import Job + + +class SampleJob(Job): + + def launch(self): + self.logger.info("Launching sample job") + + listing = self.dbutils.fs.ls("dbfs:/") + + for l in listing: + self.logger.info(f"DBFS directory: {l}") + + df = self.spark.range(0, 1000) + + df.write.format(self.conf["output_format"]).mode("overwrite").save( + self.conf["output_path"] + ) + + self.logger.info("Sample job finished!") + + +if __name__ == "__main__": + job = SampleJob() + job.launch() diff --git a/conf/deployment.yml b/conf/deployment.yml new file mode 100644 index 0000000..6b94e53 --- /dev/null +++ b/conf/deployment.yml @@ -0,0 +1,27 @@ +custom: + basic-cluster-props: &basic-cluster-props + spark_version: "9.1.x-cpu-ml-scala2.12" + + basic-static-cluster: &basic-static-cluster + new_cluster: + <<: *basic-cluster-props + num_workers: 1 + node_type_id: "Standard_E8_v3" + +# please note that we're using FUSE reference for config file, hence we're going to load this file using its local FS path +environments: + default: + strict_path_adjustment_policy: true + jobs: + - name: "cicd-sample-project-sample" + <<: + - *basic-static-cluster + spark_python_task: + python_file: "file://cicd_sample_project/jobs/sample/entrypoint.py" + parameters: ["--conf-file", "file:fuse://conf/test/sample.yml"] + - name: "cicd-sample-project-sample-integration-test" + <<: + - *basic-static-cluster + spark_python_task: + python_file: "file://tests/integration/sample_test.py" + parameters: ["--conf-file", "file:fuse://conf/test/sample.yml"] \ No newline at end of file diff --git a/conf/test/sample.yml b/conf/test/sample.yml new file mode 100644 index 0000000..a8b8bc8 --- /dev/null +++ b/conf/test/sample.yml @@ -0,0 +1,2 @@ +output_format: "delta" +output_path: "dbfs:/dbx/tmp/test/cicd_sample_project" diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..d44a1cb --- /dev/null +++ b/pytest.ini @@ -0,0 +1,7 @@ +[pytest] +addopts = -s -p no:warnings +log_cli = 1 +log_cli_level = INFO +log_cli_format = [pytest][%(asctime)s][%(levelname)s][%(module)s][%(funcName)s] %(message)s +log_cli_date_format = %Y-%m-%d %H:%M:%S +log_level = INFO \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..2a79f00 --- /dev/null +++ b/setup.py @@ -0,0 +1,11 @@ +from setuptools import find_packages, setup +from cicd_sample_project import __version__ + +setup( + name="cicd_sample_project", + packages=find_packages(exclude=["tests", "tests.*"]), + setup_requires=["wheel"], + version=__version__, + description="", + author="" +) diff --git a/tests/integration/sample_test.py b/tests/integration/sample_test.py new file mode 100644 index 0000000..5ba3f8d --- /dev/null +++ b/tests/integration/sample_test.py @@ -0,0 +1,45 @@ +import unittest + +from cicd_sample_project.jobs.sample.entrypoint import SampleJob +from uuid import uuid4 +from pyspark.dbutils import DBUtils # noqa + + +class SampleJobIntegrationTest(unittest.TestCase): + def setUp(self): + + self.test_dir = "dbfs:/tmp/tests/sample/%s" % str(uuid4()) + self.test_config = {"output_format": "delta", "output_path": self.test_dir} + + self.job = SampleJob(init_conf=self.test_config) + self.dbutils = DBUtils(self.job.spark) + self.spark = self.job.spark + + def test_sample(self): + + self.job.launch() + + output_count = ( + self.spark.read.format(self.test_config["output_format"]) + .load(self.test_config["output_path"]) + .count() + ) + + self.assertGreater(output_count, 0) + + def tearDown(self): + self.dbutils.fs.rm(self.test_dir, True) + + +if __name__ == "__main__": + # please don't change the logic of test result checks here + # it's intentionally done in this way to comply with jobs run result checks + # for other tests, please simply replace the SampleJobIntegrationTest with your custom class name + loader = unittest.TestLoader() + tests = loader.loadTestsFromTestCase(SampleJobIntegrationTest) + runner = unittest.TextTestRunner(verbosity=2) + result = runner.run(tests) + if not result.wasSuccessful(): + raise RuntimeError( + "One or multiple tests failed. Please check job logs for additional information." + ) diff --git a/tests/unit/sample_test.py b/tests/unit/sample_test.py new file mode 100644 index 0000000..69304d9 --- /dev/null +++ b/tests/unit/sample_test.py @@ -0,0 +1,39 @@ +import unittest +import tempfile +import os +import shutil + +from cicd_sample_project.jobs.sample.entrypoint import SampleJob +from pyspark.sql import SparkSession +from unittest.mock import MagicMock + +class SampleJobUnitTest(unittest.TestCase): + def setUp(self): + self.test_dir = tempfile.TemporaryDirectory().name + self.spark = SparkSession.builder.master("local[1]").getOrCreate() + self.test_config = { + "output_format": "parquet", + "output_path": os.path.join(self.test_dir, "output"), + } + self.job = SampleJob(spark=self.spark, init_conf=self.test_config) + + def test_sample(self): + # feel free to add new methods to this magic mock to mock some particular functionality + self.job.dbutils = MagicMock() + + self.job.launch() + + output_count = ( + self.spark.read.format(self.test_config["output_format"]) + .load(self.test_config["output_path"]) + .count() + ) + + self.assertGreater(output_count, 0) + + def tearDown(self): + shutil.rmtree(self.test_dir) + + +if __name__ == "__main__": + unittest.main() diff --git a/unit-requirements.txt b/unit-requirements.txt new file mode 100644 index 0000000..73a1870 --- /dev/null +++ b/unit-requirements.txt @@ -0,0 +1,7 @@ +setuptools +wheel +pyspark==3.1.2 +pyyaml +pytest +pytest-cov +dbx