From 9b742e316f14e71d4fb9a5f0ec7b7a6b6c5cba14 Mon Sep 17 00:00:00 2001 From: Yoan Salambashev <36246462+yonitoo@users.noreply.github.com> Date: Thu, 30 Mar 2023 17:28:37 +0300 Subject: [PATCH] vdk-meta-jobs: Meta Jobs DAG validation (#1785) Introduce a DagValidator class that properly validates the Meta Jobs DAG before it is built. It checks for: * DAG structure (spelling mistakes, all the types of the job dict keys) * duplicate jobs * DAG cycles Rework the test_meta_job tests by introducing a TestMetaJob class that contains all the existing ones as well as some new validation-related ones. Tested-by: all the existing tests pass and introduced some new unit tests for the different cases Signed-off-by: Yoan Salambashev --------- Signed-off-by: Yoan Salambashev Co-authored-by: Yoan Salambashev Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: github-actions <> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- .../src/vdk/plugin/meta_jobs/dag_validator.py | 158 +++++ .../src/vdk/plugin/meta_jobs/meta_dag.py | 5 +- .../meta-job-depends-on-itself/config.ini | 2 + .../jobs/meta-job-depends-on-itself/meta.py | 18 + .../jobs/meta-job-duplicate-jobs/config.ini | 2 + .../jobs/meta-job-duplicate-jobs/meta.py | 18 + .../meta-job-not-allowed-job-key/config.ini | 2 + .../jobs/meta-job-not-allowed-job-key/meta.py | 18 + .../meta-job-wrong-job-key-type/config.ini | 2 + .../jobs/meta-job-wrong-job-key-type/meta.py | 18 + .../jobs/meta-job-wrong-job-type/config.ini | 2 + .../jobs/meta-job-wrong-job-type/meta.py | 18 + .../config.ini | 2 + .../meta-job-wrong-topological-order/meta.py | 18 + .../vdk-meta-jobs/tests/test_meta_job.py | 620 +++++++++--------- 15 files changed, 585 insertions(+), 318 deletions(-) create mode 100644 projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/dag_validator.py create mode 100644 projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-depends-on-itself/config.ini create mode 100644 projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-depends-on-itself/meta.py create mode 100644 projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-duplicate-jobs/config.ini create mode 100644 projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-duplicate-jobs/meta.py create mode 100644 projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-not-allowed-job-key/config.ini create mode 100644 projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-not-allowed-job-key/meta.py create mode 100644 projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-job-key-type/config.ini create mode 100644 projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-job-key-type/meta.py create mode 100644 projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-job-type/config.ini create mode 100644 projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-job-type/meta.py create mode 100644 projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-topological-order/config.ini create mode 100644 projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-topological-order/meta.py diff --git a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/dag_validator.py b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/dag_validator.py new file mode 100644 index 0000000000..19c8233631 --- /dev/null +++ b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/dag_validator.py @@ -0,0 +1,158 @@ +# Copyright 2023-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import graphlib +import logging +from collections import namedtuple +from typing import Dict +from typing import List + +from vdk.internal.core.errors import ErrorMessage +from vdk.internal.core.errors import UserCodeError + +log = logging.getLogger(__name__) +Error = namedtuple("Error", ["TYPE", "PERMISSION", "REQUIREMENT", "CONFLICT"]) +ERROR = Error( + TYPE="type", PERMISSION="permission", REQUIREMENT="requirement", CONFLICT="conflict" +) +allowed_job_keys = {"job_name", "team_name", "fail_meta_job_on_error", "depends_on"} +required_job_keys = {"job_name", "depends_on"} + + +class DagValidator: + """ + The purpose of this class is to validate the DAG structure and jobs. + It is being used right before the DAG is built. + """ + + def validate(self, jobs: List[Dict]): + """ + Validate the structure and the order of the DAG of Data Jobs. + :param jobs: List of Data Jobs (DAG vertices) to be validated + :return: + """ + self._validate_no_duplicates(jobs) + for job in jobs: + self._validate_job(job) + self._check_dag_cycles(jobs) + log.info("Successfully validated the DAG!") + + def _raise_error( + self, jobs: List[Dict], error_type: str, reason: str, countermeasures: str + ): + raise UserCodeError( + ErrorMessage( + "", + "Meta Job failed due to a Data Job validation failure.", + f"There is a {error_type} error with job(s) {jobs}. " + reason, + "The DAG will not be built and the Meta Job will fail.", + countermeasures, + ) + ) + + def _validate_no_duplicates(self, jobs: List[Dict]): + duplicated_jobs = list({job["job_name"] for job in jobs if jobs.count(job) > 1}) + if duplicated_jobs: + self._raise_error( + duplicated_jobs, + ERROR.CONFLICT, + f"There are some duplicated jobs: {duplicated_jobs}.", + f"Remove the duplicated jobs from the list - each job can appear in the jobs list at most once. " + f"Duplicated jobs: {duplicated_jobs}.", + ) + + def _validate_job(self, job: Dict): + self._validate_allowed_and_required_keys(job) + self._validate_job_name(job) + self._validate_dependencies(job) + self._validate_team_name(job) + self._validate_fail_meta_job_on_error(job) + log.info(f"Successfully validated job: {job['job_name']}") + + def _validate_allowed_and_required_keys(self, job: Dict): + forbidden_keys = [key for key in job.keys() if key not in allowed_job_keys] + if forbidden_keys: + self._raise_error( + list(job), + ERROR.PERMISSION, + "One or more job dict keys are not allowed.", + f"Remove the forbidden Data Job Dict keys. " + f"Keys {forbidden_keys} are forbidden. Allowed keys: {allowed_job_keys}.", + ) + missing_keys = [key for key in required_job_keys if key not in job] + if missing_keys: + self._raise_error( + list(job), + ERROR.REQUIREMENT, + "One or more job dict required keys are missing.", + f"Add the missing required Data Job Dict keys. Keys {missing_keys} " + f"are missing. Required keys: {required_job_keys}.", + ) + + def _validate_job_name(self, job: Dict): + if not isinstance(job["job_name"], str): + self._raise_error( + list(job), + ERROR.TYPE, + "The type of the job dict key job_name is not string.", + f"Change the Data Job Dict value of job_name. " + f"Current type is {type(job['job_name'])}. Expected type is string.", + ) + + def _validate_dependencies(self, job: Dict): + if not (isinstance(job["depends_on"], List)): + self._raise_error( + list(job), + ERROR.TYPE, + "The type of the job dict depends_on key is not list.", + f"Check the Data Job Dict type of the depends_on key. Current type " + f"is {type(job['depends_on'])}. Expected type is list.", + ) + non_string_dependencies = [ + pred for pred in job["depends_on"] if not isinstance(pred, str) + ] + if non_string_dependencies: + self._raise_error( + list(job), + ERROR.TYPE, + "One or more items of the job dependencies list are not strings.", + f"Check the Data Job Dict values of the depends_on list. " + f"There are some non-string values: {non_string_dependencies}. Expected type is string.", + ) + + def _validate_team_name(self, job: Dict): + if "team_name" in job and not isinstance(job["team_name"], str): + self._raise_error( + list(job), + ERROR.TYPE, + "The type of the job dict key job_name is not string.", + f"Change the Data Job Dict value of team_name. " + f"Current type is {type(job['team_name'])}. Expected type is string.", + ) + + def _validate_fail_meta_job_on_error(self, job: Dict): + if "fail_meta_job_on_error" in job and not isinstance( + (job["fail_meta_job_on_error"]), bool + ): + self._raise_error( + list(job), + ERROR.TYPE, + "The type of the job dict key fail_meta_job_on_error is not bool (True/False).", + f"Change the Data Job Dict value of fail_meta_job_on_error. Current type" + f" is {type(job['fail_meta_job_on_error'])}. Expected type is bool.", + ) + + def _check_dag_cycles(self, jobs: List[Dict]): + topological_sorter = graphlib.TopologicalSorter() + for job in jobs: + topological_sorter.add(job["job_name"], *job["depends_on"]) + + try: + # Preparing the sorter raises CycleError if cycles exist + topological_sorter.prepare() + except graphlib.CycleError as e: + self._raise_error( + e.args[1][:-1], + ERROR.CONFLICT, + "There is a cycle in the DAG.", + f"Change the depends_on list of the jobs that participate in the detected cycle: {e.args[1]}.", + ) diff --git a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_dag.py b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_dag.py index b6742182cf..4732895c99 100644 --- a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_dag.py +++ b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_dag.py @@ -2,7 +2,6 @@ # SPDX-License-Identifier: Apache-2.0 import json import logging -import os import pprint import sys import time @@ -13,6 +12,7 @@ from taurus_datajob_api import ApiException from vdk.plugin.meta_jobs.cached_data_job_executor import TrackingDataJobExecutor +from vdk.plugin.meta_jobs.dag_validator import DagValidator from vdk.plugin.meta_jobs.meta import TrackableJob from vdk.plugin.meta_jobs.meta_configuration import MetaPluginConfiguration from vdk.plugin.meta_jobs.remote_data_job_executor import RemoteDataJobExecutor @@ -40,10 +40,11 @@ def __init__(self, team_name: str, meta_config: MetaPluginConfiguration): executor=RemoteDataJobExecutor(), time_between_status_check_seconds=meta_config.meta_jobs_time_between_status_check_seconds(), ) + self._dag_validator = DagValidator() def build_dag(self, jobs: List[Dict]): + self._dag_validator.validate(jobs) for job in jobs: - # TODO: add some job validation here; check the job exists, its previous jobs exists, etc trackable_job = TrackableJob( job["job_name"], job.get("team_name", self._team_name), diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-depends-on-itself/config.ini b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-depends-on-itself/config.ini new file mode 100644 index 0000000000..3b1eb64c8a --- /dev/null +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-depends-on-itself/config.ini @@ -0,0 +1,2 @@ +[owner] +team = team-awesome diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-depends-on-itself/meta.py b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-depends-on-itself/meta.py new file mode 100644 index 0000000000..1a4ada3de9 --- /dev/null +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-depends-on-itself/meta.py @@ -0,0 +1,18 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import logging + +from vdk.api.job_input import IJobInput +from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput + +log = logging.getLogger(__name__) + + +def run(job_input: IJobInput): + log.info(f"Dummy arguments {job_input.get_arguments()}") + + job1 = dict(job_name="job1", depends_on=["job1"]) + job2 = dict(job_name="job2", depends_on=["job1"]) + job3 = dict(job_name="job3", depends_on=["job1"]) + job4 = dict(job_name="job4", depends_on=["job2", "job3"]) + MetaJobInput().run_meta_job([job1, job2, job3, job4]) diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-duplicate-jobs/config.ini b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-duplicate-jobs/config.ini new file mode 100644 index 0000000000..3b1eb64c8a --- /dev/null +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-duplicate-jobs/config.ini @@ -0,0 +1,2 @@ +[owner] +team = team-awesome diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-duplicate-jobs/meta.py b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-duplicate-jobs/meta.py new file mode 100644 index 0000000000..e49a34e1c8 --- /dev/null +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-duplicate-jobs/meta.py @@ -0,0 +1,18 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import logging + +from vdk.api.job_input import IJobInput +from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput + +log = logging.getLogger(__name__) + + +def run(job_input: IJobInput): + log.info(f"Dummy arguments {job_input.get_arguments()}") + + job1 = dict(job_name="job1", depends_on=[]) + job2 = dict(job_name="job2", depends_on=["job1"]) + job3 = dict(job_name="job3", depends_on=["job1"]) + job4 = dict(job_name="job4", depends_on=["job2", "job3"]) + MetaJobInput().run_meta_job([job1, job2, job2, job3, job4]) diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-not-allowed-job-key/config.ini b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-not-allowed-job-key/config.ini new file mode 100644 index 0000000000..3b1eb64c8a --- /dev/null +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-not-allowed-job-key/config.ini @@ -0,0 +1,2 @@ +[owner] +team = team-awesome diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-not-allowed-job-key/meta.py b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-not-allowed-job-key/meta.py new file mode 100644 index 0000000000..289231760a --- /dev/null +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-not-allowed-job-key/meta.py @@ -0,0 +1,18 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import logging + +from vdk.api.job_input import IJobInput +from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput + +log = logging.getLogger(__name__) + + +def run(job_input: IJobInput): + log.info(f"Dummy arguments {job_input.get_arguments()}") + + job1 = dict(job_name="job1", depend_on=[]) + job2 = dict(job_name="job2", depends_on=["job1"]) + job3 = dict(job_name="job3", depends_on=["job1"]) + job4 = dict(job_name="job4", depends_on=["job2", "job3"]) + MetaJobInput().run_meta_job([job1, job2, job3, job4]) diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-job-key-type/config.ini b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-job-key-type/config.ini new file mode 100644 index 0000000000..3b1eb64c8a --- /dev/null +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-job-key-type/config.ini @@ -0,0 +1,2 @@ +[owner] +team = team-awesome diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-job-key-type/meta.py b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-job-key-type/meta.py new file mode 100644 index 0000000000..99217712ed --- /dev/null +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-job-key-type/meta.py @@ -0,0 +1,18 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import logging + +from vdk.api.job_input import IJobInput +from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput + +log = logging.getLogger(__name__) + + +def run(job_input: IJobInput): + log.info(f"Dummy arguments {job_input.get_arguments()}") + + job1 = dict(job_name="job1", fail_meta_job_on_error=1, depends_on=[]) + job2 = dict(job_name="job2", depends_on=["job1"]) + job3 = dict(job_name="job3", depends_on=["job1"]) + job4 = dict(job_name="job4", depends_on=["job2", "job3"]) + MetaJobInput().run_meta_job([job1, job2, job3, job4]) diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-job-type/config.ini b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-job-type/config.ini new file mode 100644 index 0000000000..3b1eb64c8a --- /dev/null +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-job-type/config.ini @@ -0,0 +1,2 @@ +[owner] +team = team-awesome diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-job-type/meta.py b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-job-type/meta.py new file mode 100644 index 0000000000..b0e18088c8 --- /dev/null +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-job-type/meta.py @@ -0,0 +1,18 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import logging + +from vdk.api.job_input import IJobInput +from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput + +log = logging.getLogger(__name__) + + +def run(job_input: IJobInput): + log.info(f"Dummy arguments {job_input.get_arguments()}") + + job1 = "job1" + job2 = dict(job_name="job2", depends_on=["job1"]) + job3 = dict(job_name="job3", depends_on=["job1"]) + job4 = dict(job_name="job4", depends_on=["job2", "job3"]) + MetaJobInput().run_meta_job([job1, job2, job3, job4]) diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-topological-order/config.ini b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-topological-order/config.ini new file mode 100644 index 0000000000..3b1eb64c8a --- /dev/null +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-topological-order/config.ini @@ -0,0 +1,2 @@ +[owner] +team = team-awesome diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-topological-order/meta.py b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-topological-order/meta.py new file mode 100644 index 0000000000..229380330c --- /dev/null +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/meta-job-wrong-topological-order/meta.py @@ -0,0 +1,18 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import logging + +from vdk.api.job_input import IJobInput +from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput + +log = logging.getLogger(__name__) + + +def run(job_input: IJobInput): + log.info(f"Dummy arguments {job_input.get_arguments()}") + + job1 = dict(job_name="job1", depends_on=["job2"]) + job2 = dict(job_name="job2", depends_on=["job1"]) + job3 = dict(job_name="job3", depends_on=["job1"]) + job4 = dict(job_name="job4", depends_on=["job2", "job3"]) + MetaJobInput().run_meta_job([job1, job2, job3, job4]) diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py b/projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py index 2324180d8a..8c1f151315 100644 --- a/projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py @@ -18,332 +18,320 @@ from werkzeug import Response -def _prepare(httpserver: PluginHTTPServer, jobs=None): - """ - :param httpserver: the pytest http server - :param jobs: list of jobs in format ('job-name', [list of http statuses to be returned], job result status) - :return: - """ - rest_api_url = httpserver.url_for("") - team_name = "team-awesome" - if jobs is None: - jobs = [ - ("job1", [200], "succeeded", 0), - ("job2", [200], "succeeded", 0), - ("job3", [200], "succeeded", 0), - ("job4", [200], "succeeded", 0), - ] +class TestMetaJob: + def _prepare(self): + rest_api_url = self.httpserver.url_for("") + team_name = "team-awesome" + if self.jobs is None: + self.jobs = [("job" + str(i), [200], "succeeded", 0) for i in range(1, 5)] + + started_jobs = dict() + + for job_name, request_responses, job_status, *execution_duration in self.jobs: + request_responses.reverse() + execution_duration = execution_duration[0] if execution_duration else 0 + + def handler(location, statuses, job_name): + def _handler_fn(r: Request): + status = statuses[0] if len(statuses) == 1 else statuses.pop() + if status < 300: + started_jobs[job_name] = time.time() + return Response(status=status, headers=dict(Location=location)) + + return _handler_fn + + self.httpserver.expect_request( + uri=f"/data-jobs/for-team/{team_name}/jobs/{job_name}/deployments/production/executions", + method="POST", + ).respond_with_handler( + handler( + f"/data-jobs/for-team/{team_name}/jobs/{job_name}/executions/{job_name}", + request_responses, + job_name, + ) + ) - started_jobs = dict() - - for job_name, request_responses, job_status, *execution_duration in jobs: - request_responses.reverse() - execution_duration = execution_duration[0] if execution_duration else 0 - - def handler(location, statuses, job_name): - def _handler_fn(r: Request): - status = statuses[0] if len(statuses) == 1 else statuses.pop() - if status < 300: - started_jobs[job_name] = time.time() - return Response(status=status, headers=dict(Location=location)) - - return _handler_fn - - httpserver.expect_request( - uri=f"/data-jobs/for-team/{team_name}/jobs/{job_name}/deployments/production/executions", - method="POST", - ).respond_with_handler( - handler( - f"/data-jobs/for-team/{team_name}/jobs/{job_name}/executions/{job_name}", - request_responses, - job_name, + def exec_handler(job_name, job_status, execution_duration): + def _handler_fn(r: Request): + actual_job_status = job_status + if time.time() < started_jobs.get(job_name, 0) + execution_duration: + actual_job_status = "running" + execution: DataJobExecution = DataJobExecution( + id=job_name, + job_name=job_name, + logs_url="http://url", + deployment=DataJobDeployment(), + start_time="2021-09-24T14:14:03.922Z", + status=actual_job_status, + message="foo", + ) + response_data = json.dumps(execution.to_dict(), indent=4) + return Response( + response_data, + status=200, + headers=None, + content_type="application/json", + ) + + return _handler_fn + + self.httpserver.expect_request( + uri=f"/data-jobs/for-team/{team_name}/jobs/{job_name}/executions/{job_name}", + method="GET", + ).respond_with_handler( + exec_handler(job_name, job_status, execution_duration) ) - ) - - def exec_handler(job_name, job_status, execution_duration): - def _handler_fn(r: Request): - actual_job_status = job_status - if time.time() < started_jobs.get(job_name, 0) + execution_duration: - actual_job_status = "running" - execution: DataJobExecution = DataJobExecution( - id=job_name, - job_name=job_name, - logs_url="http://url", - deployment=DataJobDeployment(), - start_time="2021-09-24T14:14:03.922Z", - status=actual_job_status, - message="foo", - ) - response_data = json.dumps(execution.to_dict(), indent=4) - return Response( - response_data, - status=200, - headers=None, - content_type="application/json", - ) - return _handler_fn - - httpserver.expect_request( - uri=f"/data-jobs/for-team/{team_name}/jobs/{job_name}/executions/{job_name}", - method="GET", - ).respond_with_handler(exec_handler(job_name, job_status, execution_duration)) - - def exec_list_handler(job_name): - def _handler_fn(r: Request): - execution: DataJobExecution = DataJobExecution( - id=f"{job_name}-latest", - job_name=job_name, - logs_url="http://url", - deployment=DataJobDeployment(), - start_time="2021-09-24T14:14:03.922Z", - status="succeeded", - message="foo", - ) - response_data = json.dumps(execution.to_dict(), indent=4) - return Response( - [response_data], - status=200, - headers=None, - content_type="application/json", - ) + def exec_list_handler(job_name): + def _handler_fn(r: Request): + execution: DataJobExecution = DataJobExecution( + id=f"{job_name}-latest", + job_name=job_name, + logs_url="http://url", + deployment=DataJobDeployment(), + start_time="2021-09-24T14:14:03.922Z", + status="succeeded", + message="foo", + ) + response_data = json.dumps(execution.to_dict(), indent=4) + return Response( + [response_data], + status=200, + headers=None, + content_type="application/json", + ) + + return _handler_fn + + self.httpserver.expect_request( + uri=f"/data-jobs/for-team/{team_name}/jobs/{job_name}/executions", + method="GET", + ).respond_with_handler(exec_list_handler(job_name)) + + return rest_api_url + + def _set_up(self, jobs=None, additional_env_vars=None): + self.httpserver = PluginHTTPServer() + self.httpserver.start() + self.jobs = jobs + self.api_url = self._prepare() + self.env_vars = {"VDK_CONTROL_SERVICE_REST_API_URL": self.api_url} + if additional_env_vars is not None: + self.env_vars.update(additional_env_vars) + + def _run_meta_job(self, meta_job_name): + with mock.patch.dict( + os.environ, + self.env_vars, + ): + # CliEntryBasedTestRunner (provided by vdk-test-utils) gives a away to simulate vdk command + # and mock large parts of it - e.g passed our own plugins + result: Result = self.runner.invoke( + ["run", jobs_path_from_caller_directory(meta_job_name)] + ) - return _handler_fn - - httpserver.expect_request( - uri=f"/data-jobs/for-team/{team_name}/jobs/{job_name}/executions", - method="GET", - ).respond_with_handler(exec_list_handler(job_name)) - - return rest_api_url - - -def test_meta_job(httpserver: PluginHTTPServer): - api_url = _prepare(httpserver) - - with mock.patch.dict( - os.environ, - {"VDK_CONTROL_SERVICE_REST_API_URL": api_url}, - ): - # CliEntryBasedTestRunner (provided by vdk-test-utils) gives a away to simulate vdk command - # and mock large parts of it - e.g passed our own plugins - runner = CliEntryBasedTestRunner(plugin_entry) - - result: Result = runner.invoke( - ["run", jobs_path_from_caller_directory("meta-job")] - ) - cli_assert_equal(0, result) - - -def test_meta_job_error(httpserver: PluginHTTPServer): - jobs = [ - ("job1", [200], "succeeded"), - ("job2", [200], "succeeded"), - ("job3", [200], "platform_error"), - ("job4", [200], "succeeded"), - ] - api_url = _prepare(httpserver, jobs) - - with mock.patch.dict( - os.environ, - {"VDK_CONTROL_SERVICE_REST_API_URL": api_url}, - ): - # CliEntryBasedTestRunner (provided by vdk-test-utils) gives a away to simulate vdk command - # and mock large parts of it - e.g passed our own plugins - runner = CliEntryBasedTestRunner(plugin_entry) - - result: Result = runner.invoke( - ["run", jobs_path_from_caller_directory("meta-job")] - ) - cli_assert_equal(1, result) - - -def test_meta_job_fail_false(httpserver: PluginHTTPServer): - jobs = [ - ("job1", [200], "succeeded"), - ("job2", [200], "platform_error"), - ("job3", [200], "succeeded"), - ("job4", [200], "succeeded"), - ] - api_url = _prepare(httpserver, jobs) - - with mock.patch.dict( - os.environ, - {"VDK_CONTROL_SERVICE_REST_API_URL": api_url}, - ): - # CliEntryBasedTestRunner (provided by vdk-test-utils) gives a away to simulate vdk command - # and mock large parts of it - e.g passed our own plugins - runner = CliEntryBasedTestRunner(plugin_entry) - - result: Result = runner.invoke( - ["run", jobs_path_from_caller_directory("meta-job")] - ) - cli_assert_equal(0, result) - - -def test_meta_job_conflict(httpserver: PluginHTTPServer): - jobs = [ - ("job1", [409, 200], "succeeded"), - ("job2", [500, 200], "succeeded"), - ("job3", [200], "succeeded"), - ("job4", [200], "succeeded"), - ] - api_url = _prepare(httpserver, jobs) - - with mock.patch.dict( - os.environ, - { - "VDK_CONTROL_SERVICE_REST_API_URL": api_url, + return result + + def _assert_meta_job_fails_with_error(self, result, error): + with mock.patch.dict( + os.environ, + self.env_vars, + ): + assert isinstance(result.exception, error) + # no other request should be tried as the Meta Job fails + assert len(self.httpserver.log) == 0 + + def test_meta_job(self): + self._set_up() + with mock.patch.dict( + os.environ, + self.env_vars, + ): + self.runner = CliEntryBasedTestRunner(plugin_entry) + result = self._run_meta_job("meta-job") + cli_assert_equal(0, result) + self.httpserver.stop() + + def test_meta_job_error(self): + jobs = [ + ("job1", [200], "succeeded"), + ("job2", [200], "succeeded"), + ("job3", [200], "platform_error"), + ("job4", [200], "succeeded"), + ] + self._set_up(jobs) + with mock.patch.dict( + os.environ, + self.env_vars, + ): + self.runner = CliEntryBasedTestRunner(plugin_entry) + result = self._run_meta_job("meta-job") + cli_assert_equal(1, result) + self.httpserver.stop() + + def test_meta_job_fail_false(self): + jobs = [ + ("job1", [200], "succeeded"), + ("job2", [200], "platform_error"), + ("job3", [200], "succeeded"), + ("job4", [200], "succeeded"), + ] + self._set_up(jobs) + with mock.patch.dict( + os.environ, + self.env_vars, + ): + self.runner = CliEntryBasedTestRunner(plugin_entry) + result = self._run_meta_job("meta-job") + cli_assert_equal(0, result) + self.httpserver.stop() + + def test_meta_job_conflict(self): + jobs = [ + ("job1", [409, 200], "succeeded"), + ("job2", [500, 200], "succeeded"), + ("job3", [200], "succeeded"), + ("job4", [200], "succeeded"), + ] + env_vars = { "VDK_META_JOBS_DELAYED_JOBS_RANDOMIZED_ADDED_DELAY_SECONDS": "0", "VDK_META_JOBS_DELAYED_JOBS_MIN_DELAY_SECONDS": "0", - }, - ): - # CliEntryBasedTestRunner (provided by vdk-test-utils) gives a away to simulate vdk command - # and mock large parts of it - e.g passed our own plugins - runner = CliEntryBasedTestRunner(plugin_entry) - - result: Result = runner.invoke( - ["run", jobs_path_from_caller_directory("meta-job")] - ) - cli_assert_equal(0, result) - - -def test_meta_job_cannot_start_job(httpserver: PluginHTTPServer): - jobs = [ - ("job1", [401, 200], "succeeded"), - ("job2", [200], "succeeded"), - ("job3", [200], "succeeded"), - ("job4", [200], "succeeded"), - ] - api_url = _prepare(httpserver, jobs) - - with mock.patch.dict( - os.environ, - {"VDK_CONTROL_SERVICE_REST_API_URL": api_url}, - ): - # CliEntryBasedTestRunner (provided by vdk-test-utils) gives a away to simulate vdk command - # and mock large parts of it - e.g passed our own plugins - runner = CliEntryBasedTestRunner(plugin_entry) - - result: Result = runner.invoke( - ["run", jobs_path_from_caller_directory("meta-job")] - ) - cli_assert_equal(1, result) - # we should have 2 requests in the log, one to get a list - # of all executions, and one for the failing data job - # no other request should be tried as the meta job fails - assert len(httpserver.log) == 2 - - -def test_meta_job_long_running(httpserver: PluginHTTPServer): - jobs = [ - ("job1", [200], "succeeded", 3), # execution duration is 3 seconds - ("job2", [200], "succeeded"), - ("job3", [200], "succeeded"), - ("job4", [200], "succeeded"), - ] - api_url = _prepare(httpserver, jobs) - - with mock.patch.dict( - os.environ, - { - "VDK_CONTROL_SERVICE_REST_API_URL": api_url, + } + self._set_up(jobs, env_vars) + with mock.patch.dict( + os.environ, + self.env_vars, + ): + self.runner = CliEntryBasedTestRunner(plugin_entry) + result = self._run_meta_job("meta-job") + cli_assert_equal(0, result) + self.httpserver.stop() + + def test_meta_job_cannot_start_job(self): + jobs = [ + ("job1", [401, 200], "succeeded"), + ("job2", [200], "succeeded"), + ("job3", [200], "succeeded"), + ("job4", [200], "succeeded"), + ] + self._set_up(jobs) + with mock.patch.dict( + os.environ, + self.env_vars, + ): + self.runner = CliEntryBasedTestRunner(plugin_entry) + result = self._run_meta_job("meta-job") + cli_assert_equal(1, result) + # we should have 2 requests in the log, one to get a list + # of all executions, and one for the failing data job + # no other request should be tried as the meta job fails + assert len(self.httpserver.log) == 2 + self.httpserver.stop() + + def test_meta_job_long_running(self): + jobs = [ + ("job1", [200], "succeeded", 3), # execution duration is 3 seconds + ("job2", [200], "succeeded"), + ("job3", [200], "succeeded"), + ("job4", [200], "succeeded"), + ] + env_vars = { # we set 5 seconds more than execution duration of 3 set above "VDK_META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS": "5", "VDK_META_JOBS_DAG_EXECUTION_CHECK_TIME_PERIOD_SECONDS": "0", - }, - ): - # CliEntryBasedTestRunner (provided by vdk-test-utils) gives a away to simulate vdk command - # and mock large parts of it - e.g passed our own plugins - runner = CliEntryBasedTestRunner(plugin_entry) - - result: Result = runner.invoke( - ["run", jobs_path_from_caller_directory("meta-job")] - ) - cli_assert_equal(0, result) - job1_requests = [ - req - for req, res in httpserver.log - if req.method == "GET" and req.base_url.endswith("job1") - ] - # We have 1 call during start, 1 call at finish and 1 call that returns running and 1 that returns the final - # status. For total of 4 - # NB: test (verification) that requires that deep implementation details knowledge is - # not a good idea but we need to verify that we are not hammering the API Server somehow ... - assert len(job1_requests) == 4 - - # let's make sure something else is not generating more requests then expected - # if implementation is changed the number below would likely change. - # If the new count is not that big we can edit it here to pass the test, - # if the new count is too big, we have an issue that need to be investigated. - assert len(httpserver.log) == 21 - - -def test_meta_job_circular_dependency(httpserver: PluginHTTPServer): - jobs = [ - ("job1", [200], "succeeded"), - ("job2", [200], "succeeded"), - ("job3", [200], "succeeded"), - ("job4", [200], "succeeded"), - ] - api_url = _prepare(httpserver, jobs) - - with mock.patch.dict( - os.environ, - {"VDK_CONTROL_SERVICE_REST_API_URL": api_url}, - ): - # CliEntryBasedTestRunner (provided by vdk-test-utils) gives a away to simulate vdk command - # and mock large parts of it - e.g passed our own plugins - runner = CliEntryBasedTestRunner(plugin_entry) - - result: Result = runner.invoke( - ["run", jobs_path_from_caller_directory("meta-job-circular-dep")] - ) - cli_assert_equal(1, result) - # no other request should be tried as the meta job fails - assert isinstance(result.exception, UserCodeError) - assert len(httpserver.log) == 0 - - -def test_meta_job_concurrent_running_jobs_limit(httpserver: PluginHTTPServer): - jobs = [("job" + str(i), [200], "succeeded", 1) for i in range(1, 8)] - api_url = _prepare(httpserver, jobs) - - with mock.patch.dict( - os.environ, - { - "VDK_CONTROL_SERVICE_REST_API_URL": api_url, + } + self._set_up(jobs, env_vars) + with mock.patch.dict( + os.environ, + self.env_vars, + ): + self.runner = CliEntryBasedTestRunner(plugin_entry) + result = self._run_meta_job("meta-job") + cli_assert_equal(0, result) + job1_requests = [ + req + for req, res in self.httpserver.log + if req.method == "GET" and req.base_url.endswith("job1") + ] + # We have 1 call during start, 1 call at finish and 1 call that returns running and 1 that returns the final + # status. For total of 4 + # NB: test (verification) that requires that deep implementation details knowledge is + # not a good idea but we need to verify that we are not hammering the API Server somehow ... + assert len(job1_requests) == 4 + + # let's make sure something else is not generating more requests then expected + # if implementation is changed the number below would likely change. + # If the new count is not that big we can edit it here to pass the test, + # if the new count is too big, we have an issue that need to be investigated. + assert len(self.httpserver.log) == 21 + self.httpserver.stop() + + def test_meta_job_concurrent_running_jobs_limit(self): + jobs = [("job" + str(i), [200], "succeeded", 1) for i in range(1, 8)] + env_vars = { "VDK_META_JOBS_MAX_CONCURRENT_RUNNING_JOBS": "2", "VDK_META_JOBS_DELAYED_JOBS_MIN_DELAY_SECONDS": "1", "VDK_META_JOBS_DELAYED_JOBS_RANDOMIZED_ADDED_DELAY_SECONDS": "1", "VDK_META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS": "1", - }, - ): - # CliEntryBasedTestRunner (provided by vdk-test-utils) gives a way to simulate vdk command - # and mock large parts of it - e.g passed our own plugins - runner = CliEntryBasedTestRunner(plugin_entry) - - result: Result = runner.invoke( - ["run", jobs_path_from_caller_directory("meta-job-exceed-limit")] - ) - - expected_max_running_jobs = int( - os.getenv("VDK_META_JOBS_MAX_CONCURRENT_RUNNING_JOBS", "2") - ) - # keep track of the number of running jobs at any given time - running_jobs = set() - for request, response in httpserver.log: - if "executions" in request.path: - if request.method == "POST": - job_name = request.path.split("/jobs/")[1].split("/")[0] - running_jobs.add(job_name) - assert ( - len(running_jobs) <= expected_max_running_jobs - ) # assert that max concurrent running jobs is not exceeded - if request.method == "GET": - execution = json.loads(response.response[0]) - if execution["status"] == "succeeded": - running_jobs.discard(execution["job_name"]) - cli_assert_equal(0, result) - # assert that all the jobs finished successfully - assert len(running_jobs) == 0 + } + self._set_up(jobs, env_vars) + with mock.patch.dict( + os.environ, + self.env_vars, + ): + self.runner = CliEntryBasedTestRunner(plugin_entry) + result = self._run_meta_job("meta-job-exceed-limit") + expected_max_running_jobs = int( + os.getenv("VDK_META_JOBS_MAX_CONCURRENT_RUNNING_JOBS", "2") + ) + # keep track of the number of running jobs at any given time + running_jobs = set() + for request, response in self.httpserver.log: + if "executions" in request.path: + if request.method == "POST": + job_name = request.path.split("/jobs/")[1].split("/")[0] + running_jobs.add(job_name) + assert ( + len(running_jobs) <= expected_max_running_jobs + ) # assert that max concurrent running jobs is not exceeded + if request.method == "GET": + execution = json.loads(response.response[0]) + if execution["status"] == "succeeded": + running_jobs.discard(execution["job_name"]) + cli_assert_equal(0, result) + # assert that all the jobs finished successfully + assert len(running_jobs) == 0 + self.httpserver.stop() + + def _test_meta_job_validation(self, meta_job_name): + self._set_up() + with mock.patch.dict( + os.environ, + self.env_vars, + ): + self.runner = CliEntryBasedTestRunner(plugin_entry) + result = self._run_meta_job(meta_job_name) + cli_assert_equal(1, result) + self._assert_meta_job_fails_with_error(result, UserCodeError) + self.httpserver.stop() + + def test_meta_job_circular_dependency(self): + self._test_meta_job_validation("meta-job-circular-dep") + + def test_meta_job_depends_on_itself(self): + self._test_meta_job_validation("meta-job-depends-on-itself") + + def test_meta_job_duplicate_jobs(self): + self._test_meta_job_validation("meta-job-duplicate-jobs") + + def test_meta_job_not_allowed_job_key(self): + self._test_meta_job_validation("meta-job-not-allowed-job-key") + + def test_meta_job_wrong_job_key_type(self): + self._test_meta_job_validation("meta-job-wrong-job-key-type") + + def test_meta_job_wrong_job_type(self): + self._test_meta_job_validation("meta-job-wrong-job-type") + + def test_meta_job_wrong_topological_order(self): + self._test_meta_job_validation("meta-job-wrong-topological-order")