diff --git a/projects/vdk-plugins/vdk-meta-jobs/README.md b/projects/vdk-plugins/vdk-meta-jobs/README.md index ee65fffb51..6625529098 100644 --- a/projects/vdk-plugins/vdk-meta-jobs/README.md +++ b/projects/vdk-plugins/vdk-meta-jobs/README.md @@ -1,10 +1,17 @@ # Meta Jobs -Express dependecies between data jobs. +Express dependencies between data jobs. -A plugin for Versatile Data Kit which extends its Job API with an additional feature which allows users to trigger so called Meta Jobs. -Meta Jobs are data jobs which trigger one or more other jobs, wait for their completion, and then trigger another set of jobs until either the entire job pipeline has succeeded. +A plugin for Versatile Data Kit extends its Job API with an additional feature that allows users to trigger so-called Meta Jobs. +A meta job is a regular Data Job that invokes other Data Jobs using Control Service Execution API. +In this way, there's nothing different from other data jobs except for its purpose. See [Data Job types](https://github.com/vmware/versatile-data-kit/wiki/User-Guide#data-job-types) for more info. + +It's meant to be a much more lightweight alternative to complex and comprehensive workflows solution (like Airflow) +as it doesn't require to provision any new infrastructure or to need to learn new tool. +You install a new python library (this plugin itself) and you are ready to go. + +Using this plugin you can specify dependencies between data jobs as a direct acyclic graph (DAG). See usage for more information. ## Usage @@ -42,6 +49,12 @@ The following example dependency graph can be implemented with below code. ![img_2.png](img_2.png) +In this example what happens is +* Job 1 will execute. +* After Job 1 is completed, jobs 2,3,4 will start executing in parallel. +* Jobs 5 and 6 will start executing after job 3 completes, but will not wait for the completion of jobs 2 and 4. + + ```python from vdk.api.job_input import IJobInput diff --git a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_dag.py b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_dag.py index 26c8f053b4..ef8500f166 100644 --- a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_dag.py +++ b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_dag.py @@ -5,6 +5,7 @@ import os import pprint import sys +import time from graphlib import TopologicalSorter from typing import Any from typing import Dict @@ -33,6 +34,9 @@ def __init__(self, team_name: str): ) ), ) + self._dag_execution_check_time_period_seconds = int( + os.environ.get("VDK_META_JOBS_DAG_EXECUTION_CHECK_TIME_PERIOD_SECONDS", 10) + ) self._finished_jobs = [] self._job_executor = TrackingDataJobExecutor(RemoteDataJobExecutor()) @@ -53,12 +57,17 @@ def execute_dag(self): self._start_job(node) self._start_delayed_jobs() - for node in self._get_finalized_jobs(): + finalized_jobs = self._get_finalized_jobs() + for node in finalized_jobs: if node not in self._finished_jobs: log.info(f"Data Job {node} has finished.") self._topological_sorter.done(node) self._job_executor.finalize_job(node) self._finished_jobs.append(node) + if not finalized_jobs: + # No jobs are finished at this iteration so let's wait a bit to let them + # finish + time.sleep(self._dag_execution_check_time_period_seconds) def _start_delayed_jobs(self): while True: diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py b/projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py index 67429e05a5..c3e85f9e09 100644 --- a/projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py @@ -221,6 +221,7 @@ def test_meta_job_long_running(httpserver: PluginHTTPServer): "VDK_CONTROL_SERVICE_REST_API_URL": api_url, # we set 5 seconds more than execution duration of 3 set above "VDK_META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS": "5", + "VDK_META_JOBS_DAG_EXECUTION_CHECK_TIME_PERIOD_SECONDS": "0", }, ): # CliEntryBasedTestRunner (provided by vdk-test-utils) gives a away to simulate vdk command