Skip to content

Commit

Permalink
vdk-meta-jobs: address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Antoni Ivanov <[email protected]>
  • Loading branch information
antoniivanov committed Oct 28, 2022
1 parent 8084fe6 commit 23de03d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 4 deletions.
19 changes: 16 additions & 3 deletions projects/vdk-plugins/vdk-meta-jobs/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
# Meta Jobs

Express dependecies between data jobs.
Express dependencies between data jobs.

A plugin for Versatile Data Kit which extends its Job API with an additional feature which allows users to trigger so called Meta Jobs.
Meta Jobs are data jobs which trigger one or more other jobs, wait for their completion, and then trigger another set of jobs until either the entire job pipeline has succeeded.
A plugin for Versatile Data Kit extends its Job API with an additional feature that allows users to trigger so-called Meta Jobs.

A meta job is a regular Data Job that invokes other Data Jobs using Control Service Execution API.
In this way, there's nothing different from other data jobs except for its purpose. See [Data Job types](https://github.com/vmware/versatile-data-kit/wiki/User-Guide#data-job-types) for more info.

It's meant to be a much more lightweight alternative to complex and comprehensive workflows solution (like Airflow)
as it doesn't require to provision any new infrastructure or to need to learn new tool.
You install a new python library (this plugin itself) and you are ready to go.

Using this plugin you can specify dependencies between data jobs as a direct acyclic graph (DAG). See usage for more information.

## Usage

Expand Down Expand Up @@ -42,6 +49,12 @@ The following example dependency graph can be implemented with below code.

![img_2.png](img_2.png)

In this example what happens is
* Job 1 will execute.
* After Job 1 is completed, jobs 2,3,4 will start executing in parallel.
* Jobs 5 and 6 will start executing after job 3 completes, but will not wait for the completion of jobs 2 and 4.


```python

from vdk.api.job_input import IJobInput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import pprint
import sys
import time
from graphlib import TopologicalSorter
from typing import Any
from typing import Dict
Expand Down Expand Up @@ -33,6 +34,9 @@ def __init__(self, team_name: str):
)
),
)
self._dag_execution_check_time_period_seconds = int(
os.environ.get("VDK_META_JOBS_DAG_EXECUTION_CHECK_TIME_PERIOD_SECONDS", 10)
)
self._finished_jobs = []
self._job_executor = TrackingDataJobExecutor(RemoteDataJobExecutor())

Expand All @@ -53,12 +57,17 @@ def execute_dag(self):
self._start_job(node)
self._start_delayed_jobs()

for node in self._get_finalized_jobs():
finalized_jobs = self._get_finalized_jobs()
for node in finalized_jobs:
if node not in self._finished_jobs:
log.info(f"Data Job {node} has finished.")
self._topological_sorter.done(node)
self._job_executor.finalize_job(node)
self._finished_jobs.append(node)
if not finalized_jobs:
# No jobs are finished at this iteration so let's wait a bit to let them
# finish
time.sleep(self._dag_execution_check_time_period_seconds)

def _start_delayed_jobs(self):
while True:
Expand Down
1 change: 1 addition & 0 deletions projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ def test_meta_job_long_running(httpserver: PluginHTTPServer):
"VDK_CONTROL_SERVICE_REST_API_URL": api_url,
# we set 5 seconds more than execution duration of 3 set above
"VDK_META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS": "5",
"VDK_META_JOBS_DAG_EXECUTION_CHECK_TIME_PERIOD_SECONDS": "0",
},
):
# CliEntryBasedTestRunner (provided by vdk-test-utils) gives a away to simulate vdk command
Expand Down

0 comments on commit 23de03d

Please sign in to comment.