Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-meta-jobs: Initial implementation #1249

Merged
merged 6 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions projects/vdk-plugins/vdk-meta-jobs/.plugin-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0

image: "python:3.7"

.build-vdk-meta-jobs:
variables:
PLUGIN_NAME: vdk-meta-jobs
extends: .build-plugin

build-py37-vdk-meta-jobs:
extends: .build-vdk-meta-jobs
image: "python:3.7"


build-py38-vdk-meta-jobs:
extends: .build-vdk-meta-jobs
image: "python:3.8"

build-py39-vdk-meta-jobs:
extends: .build-vdk-meta-jobs
image: "python:3.9"

build-py310-vdk-meta-jobs:
extends: .build-vdk-meta-jobs
image: "python:3.10"

release-vdk-meta-jobs:
variables:
PLUGIN_NAME: vdk-meta-jobs
extends: .release-plugin
160 changes: 160 additions & 0 deletions projects/vdk-plugins/vdk-meta-jobs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# Meta Jobs

Express dependencies between data jobs.

A plugin for Versatile Data Kit extends its Job API with an additional feature that allows users to trigger so-called Meta Jobs.

A meta job is a regular Data Job that invokes other Data Jobs using Control Service Execution API.
In this way, there's nothing different from other data jobs except for its purpose. See [Data Job types](https://github.com/vmware/versatile-data-kit/wiki/User-Guide#data-job-types) for more info.

It's meant to be a much more lightweight alternative to complex and comprehensive workflows solution (like Airflow)
as it doesn't require to provision any new infrastructure or to need to learn new tool.
You install a new python library (this plugin itself) and you are ready to go.

Using this plugin you can specify dependencies between data jobs as a direct acyclic graph (DAG). See usage for more information.

## Usage

```
pip install vdk-meta-jobs
```

Then one would create a single step and define the jobs we want to orchestrate

```python
def run(job_input):
jobs = [
{
"job_name": "name-of-job",
"team_name": "team-of-job",
"fail_meta_job_on_error": True or False,
"depends_on": [name-of-job1, name-of-job2]
},
...
]
MetaJobInput().run_meta_job(jobs)
```

When defining a job to be run following attributes are supported:
* **job_name**: required, the name of the data job
* **team_name:**: optional, the team of the data job. If omitted , it will use the meta job's team
* **fail_meta_job_on_error**: optional, default is true. if true, the meta job will abort and fail if the orchestrated job fails, if false, meta job won't fail and continue.
* **depends_on**: required (can be empty), list of other jobs that the orchestrated job depends on. The job will not be started until depends_on job have finished.


### Example

The following example dependency graph can be implemented with below code.


![img_2.png](img_2.png)

In this example what happens is
* Job 1 will execute.
* After Job 1 is completed, jobs 2,3,4 will start executing in parallel.
* Jobs 5 and 6 will start executing after job 3 completes, but will not wait for the completion of jobs 2 and 4.


```python

from vdk.api.job_input import IJobInput
from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput

JOBS_RUN_ORDER = [
{
"job_name": "job1",
"team_name": "team-awesome",
"fail_meta_job_on_error": True,
"depends_on": []
},

{
"job_name": "job2",
"team_name": "team-awesome",
"fail_meta_job_on_error": True,
"depends_on": ["job1"]
},
{
"job_name": "job3",
"team_name": "team-awesome",
"fail_meta_job_on_error": True,
"depends_on": ["job1"]
},
{
"job_name": "job4",
"team_name": "team-awesome",
"fail_meta_job_on_error": True,
"depends_on": ["job1"]
},

{
"job_name": "job5",
"team_name": "team-awesome",
"fail_meta_job_on_error": True,
"depends_on": ["job3"]
},
{
"job_name": "job6",
"team_name": "team-awesome",
"fail_meta_job_on_error": True,
"depends_on": ["job3"]
},
]


def run(job_input: IJobInput) - > None:
MetaJobInput().run_meta_job(JOBS_RUN_ORDER)
```

### FAQ


**Q: Will the metajob retry on Platform Error?**<br>
A: Yes, as any other job, up to N (configurable by the Control Service) attempts for each job it is orchestrating.
See Control Service documentation for more information

**Q: If an orchestrated job fails, will the meta job fail?**<br>
Only if fail_meta_job_on_error flag is set to True (which is teh default setting if omited)

The meta job then will fail with USER error (regardless of how the orchestrated job failed)


**Q: Am I able to run the metajob locally?**<br>
A: Yes, but the jobs orchestrated must be deployed to the cloud (by the Control Service).

**Q: Is there memory limit of the meta job?**<br>
A: The normal per job limits apply for any jobs orchestrated/started by the meta job.

**Q: Is there execution time limit of the meta job?**<br>
A: Yes, the meta job must finish within the same limit as any normal data job.
The total time of all data jobs started by the meta job must be less than the limit specified.
The overall limit is controlled by Control Service administrators

**Q: Is the metajob going to fail and not trigger the remaining jobs if any of the jobs it is orchestrating fails?**<br>
A: This is configurable by the end user in the parameter fail_meta_job_on_error

**Q: Can I schedule one job to run every hour and use it in the meta job at the same time?**<br>
A: Yes, if the job is already running, the metajob will wait for the concurrent run to finish and then trigger the job again from the meta job,
If the job is already running as part of the meta job, the concurrent scheduled run will be skipped


### Build and testing

```
pip install -r requirements.txt
pip install -e .
pytest
```

In VDK repo [../build-plugin.sh](https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/build-plugin.sh) script can be used also.


#### Note about the CICD:

.plugin-ci.yaml is needed only for plugins part of [Versatile Data Kit Plugin repo](https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins).

The CI/CD is separated in two stages, a build stage and a release stage.
The build stage is made up of a few jobs, all which inherit from the same
job configuration and only differ in the Python version they use (3.7, 3.8, 3.9 and 3.10).
They run according to rules, which are ordered in a way such that changes to a
plugin's directory trigger the plugin CI, but changes to a different plugin does not.
Binary file added projects/vdk-plugins/vdk-meta-jobs/img_2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 11 additions & 0 deletions projects/vdk-plugins/vdk-meta-jobs/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# this file is used to provide testing requirements
# for requirements (dependencies) needed during installation update setup.py install_requires section


pytest
pytest-httpserver
vdk-control-api-auth
vdk-control-service-api
vdk-core
vdk-plugin-control-cli
vdk-test-utils
40 changes: 40 additions & 0 deletions projects/vdk-plugins/vdk-meta-jobs/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import pathlib

import setuptools

"""
Builds a package with the help of setuptools in order for this package to be imported in other projects
"""

__version__ = "0.1.0"

setuptools.setup(
name="vdk-meta-jobs",
version=__version__,
url="https://github.com/vmware/versatile-data-kit",
description="Express dependecies between data jobs.",
long_description=pathlib.Path("README.md").read_text(),
long_description_content_type="text/markdown",
install_requires=[
"vdk-core",
"graphlib-backport",
"vdk-control-api-auth",
"vdk-plugin-control-cli",
"vdk-control-service-api",
],
package_dir={"": "src"},
packages=setuptools.find_namespace_packages(where="src"),
# This is the only vdk plugin specifc part
# Define entry point called "vdk.plugin.run" with name of plugin and module to act as entry point.
entry_points={"vdk.plugin.run": ["meta-jobs = vdk.plugin.meta_jobs.plugin_entry"]},
classifiers=[
"Development Status :: 2 - Pre-Alpha",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import abc
from abc import abstractmethod
from dataclasses import dataclass
from dataclasses import field
from typing import List


@dataclass
class SingleJob:
"""
TODO
"""

job_name: str
team_name: str = None
fail_meta_job_on_error: bool = True
depends_on: List[str] = field(default_factory=list)


@dataclass
class MetaJob(SingleJob):
"""
TODO
"""

jobs: List[SingleJob] = field(default_factory=list)


class IMetaJobInput(abc.ABC):
@abstractmethod
def run_meta_job(self, meta_job: MetaJob):
"""
TODO
:param meta_job:
:return:
"""
pass
Loading