Skip to content

Commit

Permalink
vdk-meta-jobs: address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Antoni Ivanov <[email protected]>
  • Loading branch information
antoniivanov committed Nov 1, 2022
1 parent 15893a5 commit ae9a862
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 18 deletions.
22 changes: 18 additions & 4 deletions projects/vdk-plugins/vdk-meta-jobs/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
# Meta Jobs

Express dependecies between data jobs.
Express dependencies between data jobs.

A plugin for Versatile Data Kit which extends its Job API with an additional feature which allows users to trigger so called Meta Jobs.
Meta Jobs are data jobs which trigger one or more other jobs, wait for their completion, and then trigger another set of jobs until either the entire job pipeline has succeeded.
A plugin for Versatile Data Kit extends its Job API with an additional feature that allows users to trigger so-called Meta Jobs.

A meta job is a regular Data Job that invokes other Data Jobs using Control Service Execution API.
In this way, there's nothing different from other data jobs except for its purpose. See [Data Job types](https://github.com/vmware/versatile-data-kit/wiki/User-Guide#data-job-types) for more info.

It's meant to be a much more lightweight alternative to complex and comprehensive workflows solution (like Airflow)
as it doesn't require to provision any new infrastructure or to need to learn new tool.
You install a new python library (this plugin itself) and you are ready to go.

Using this plugin you can specify dependencies between data jobs as a direct acyclic graph (DAG). See usage for more information.

## Usage

Expand Down Expand Up @@ -42,6 +49,12 @@ The following example dependency graph can be implemented with below code.

![img_2.png](img_2.png)

In this example what happens is
* Job 1 will execute.
* After Job 1 is completed, jobs 2,3,4 will start executing in parallel.
* Jobs 5 and 6 will start executing after job 3 completes, but will not wait for the completion of jobs 2 and 4.


```python

from vdk.api.job_input import IJobInput
Expand Down Expand Up @@ -97,7 +110,8 @@ def run(job_input: IJobInput) - > None:


**Q: Will the metajob retry on Platform Error?**<br>
A: Yes, up to N (configurable by the Control Service) attempts for each job it is orchestrating.
A: Yes, as any other job, up to N (configurable by the Control Service) attempts for each job it is orchestrating.
See Control Service documentation for more information

**Q: If an orchestrated job fails, will the meta job fail?**<br>
Only if fail_meta_job_on_error flag is set to True (which is teh default setting if omited)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import time
from typing import Dict

from taurus_datajob_api import ApiException
from vdk.internal.core.errors import ErrorMessage
from vdk.internal.core.errors import UserCodeError
from vdk.plugin.meta_jobs.meta import IDataJobExecutor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import pprint
import sys
import time
from graphlib import TopologicalSorter
from typing import Any
from typing import Dict
Expand Down Expand Up @@ -33,7 +34,9 @@ def __init__(self, team_name: str):
)
),
)
self._finished_jobs = []
self._dag_execution_check_time_period_seconds = int(
os.environ.get("VDK_META_JOBS_DAG_EXECUTION_CHECK_TIME_PERIOD_SECONDS", 10)
)
self._job_executor = TrackingDataJobExecutor(RemoteDataJobExecutor())

def build_dag(self, jobs: List[Dict]):
Expand All @@ -53,12 +56,18 @@ def execute_dag(self):
self._start_job(node)
self._start_delayed_jobs()

for node in self._get_finalized_jobs():
if node not in self._finished_jobs:
log.info(f"Data Job {node} has finished.")
self._topological_sorter.done(node)
self._job_executor.finalize_job(node)
self._finished_jobs.append(node)
finished_jobs = self._job_executor.get_finished_job_names()
self._finalize_jobs(finished_jobs)
if not finished_jobs:
# No jobs are finished at this iteration so let's wait a bit to let them
# finish
time.sleep(self._dag_execution_check_time_period_seconds)

def _finalize_jobs(self, finalized_jobs):
for node in finalized_jobs:
log.info(f"Data Job {node} has finished.")
self._topological_sorter.done(node)
self._job_executor.finalize_job(node)

def _start_delayed_jobs(self):
while True:
Expand Down Expand Up @@ -104,6 +113,3 @@ def _start_job(self, node):
self._delayed_starting_jobs.enqueue(node)
else:
raise

def _get_finalized_jobs(self) -> List:
return self._job_executor.get_finished_job_names()
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from abc import ABC
from typing import Dict

from vdk.internal.control.configuration.vdk_config import VDKConfig
from vdk.plugin.meta_jobs.meta import IDataJobExecutor
from vdk.plugin.meta_jobs.remote_data_job import RemoteDataJob
Expand Down
69 changes: 69 additions & 0 deletions projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import time
from unittest.mock import call
from unittest.mock import MagicMock

from vdk.plugin.meta_jobs.cached_data_job_executor import TrackingDataJobExecutor
from vdk.plugin.meta_jobs.meta_dag import MetaJobsDag

# We overall eschew unit tests in favor of functional tests in test_meta_job
# Still some functionalities are more easily tested in unit tests so we add here some.


def test_execute_dag_happy_case():
job1 = dict(job_name="job1", depends_on=[])
job2 = dict(job_name="job2", depends_on=["job1"])
job3 = dict(job_name="job3", depends_on=["job1"])
job4 = dict(job_name="job4", depends_on=["job2", "job3"])
jobs = [job1, job2, job3, job4]

dag = MetaJobsDag("team")
dag.build_dag(jobs)
dag._job_executor = MagicMock(spec=TrackingDataJobExecutor)
dag._job_executor.get_finished_job_names.side_effect = [
["job1"],
["job2", "job3"],
["job4"],
]

dag.execute_dag()

assert [
call("job1"),
call("job2"),
call("job3"),
call("job4"),
] == dag._job_executor.start_job.call_args_list


def test_execute_dag_busyloop():
job1 = dict(job_name="job1", depends_on=[])
job2 = dict(job_name="job2", depends_on=["job1"])
job3 = dict(job_name="job3", depends_on=["job1"])
jobs = [job1, job2, job3]

dag = MetaJobsDag("team")
dag.build_dag(jobs)
dag._job_executor = MagicMock(spec=TrackingDataJobExecutor)
dag._dag_execution_check_time_period_seconds = 3

calls = [0]
start_time = [0]

def mock_get_finished_job_names(*args, **kwargs):
calls[0] += 1
if calls[0] == 1:
start_time[0] = time.time()
return ["job1"]
elif time.time() - start_time[0] > 2:
return ["job2", "job3"]
else:
return []

dag._job_executor.get_finished_job_names.side_effect = mock_get_finished_job_names

dag.execute_dag()

# check for busyloop (this would have been called hundreds of times if there is busyloop bug)
assert calls[0] <= 4
28 changes: 28 additions & 0 deletions projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pytest_httpserver.pytest_plugin import PluginHTTPServer
from taurus_datajob_api import DataJobDeployment
from taurus_datajob_api import DataJobExecution
from vdk.internal.core.errors import UserCodeError
from vdk.plugin.meta_jobs import plugin_entry
from vdk.plugin.test_utils.util_funcs import cli_assert_equal
from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner
Expand Down Expand Up @@ -221,6 +222,7 @@ def test_meta_job_long_running(httpserver: PluginHTTPServer):
"VDK_CONTROL_SERVICE_REST_API_URL": api_url,
# we set 5 seconds more than execution duration of 3 set above
"VDK_META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS": "5",
"VDK_META_JOBS_DAG_EXECUTION_CHECK_TIME_PERIOD_SECONDS": "0",
},
):
# CliEntryBasedTestRunner (provided by vdk-test-utils) gives a away to simulate vdk command
Expand All @@ -247,3 +249,29 @@ def test_meta_job_long_running(httpserver: PluginHTTPServer):
# If the new count is not that big we can edit it here to pass the test,
# if the new count is too big, we have an issue that need to be investigated.
assert len(httpserver.log) == 17


def test_meta_job_circular_dependency(httpserver: PluginHTTPServer):
jobs = [
("job1", [200], "succeeded"),
("job2", [200], "succeeded"),
("job3", [200], "succeeded"),
("job4", [200], "succeeded"),
]
api_url = _prepare(httpserver, jobs)

with mock.patch.dict(
os.environ,
{"VDK_CONTROL_SERVICE_REST_API_URL": api_url},
):
# CliEntryBasedTestRunner (provided by vdk-test-utils) gives a away to simulate vdk command
# and mock large parts of it - e.g passed our own plugins
runner = CliEntryBasedTestRunner(plugin_entry)

result: Result = runner.invoke(
["run", jobs_path_from_caller_directory("meta-job-circular-dep")]
)
cli_assert_equal(1, result)
# no other request should be tried as the meta job fails
assert isinstance(result.exception, UserCodeError)
assert len(httpserver.log) == 0

0 comments on commit ae9a862

Please sign in to comment.