diff --git a/projects/vdk-plugins/vdk-meta-jobs/README.md b/projects/vdk-plugins/vdk-meta-jobs/README.md index 2afdc8a03e..08cb9bdbcf 100644 --- a/projects/vdk-plugins/vdk-meta-jobs/README.md +++ b/projects/vdk-plugins/vdk-meta-jobs/README.md @@ -28,6 +28,7 @@ def run(job_input): "job_name": "name-of-job", "team_name": "team-of-job", "fail_meta_job_on_error": True or False, + "arguments": {"key1": value1, "key2": value2}, "depends_on": [name-of-job1, name-of-job2] }, ... @@ -38,7 +39,8 @@ def run(job_input): When defining a job to be run following attributes are supported: * **job_name**: required, the name of the data job * **team_name:**: optional, the team of the data job. If omitted , it will use the meta job's team -* **fail_meta_job_on_error**: optional, default is true. if true, the meta job will abort and fail if the orchestrated job fails, if false, meta job won't fail and continue. +* **fail_meta_job_on_error**: optional, default is true. If true, the meta job will abort and fail if the orchestrated job fails, if false, meta job won't fail and continue. +* **arguments**: optional, the arguments that are passed to the underlying orchestrated data job. * **depends_on**: required (can be empty), list of other jobs that the orchestrated job depends on. The job will not be started until depends_on job have finished. @@ -65,6 +67,7 @@ JOBS_RUN_ORDER = [ "job_name": "job1", "team_name": "team-awesome", "fail_meta_job_on_error": True, + "arguments": {}, "depends_on": [] }, @@ -72,18 +75,21 @@ JOBS_RUN_ORDER = [ "job_name": "job2", "team_name": "team-awesome", "fail_meta_job_on_error": True, + "arguments": {}, "depends_on": ["job1"] }, { "job_name": "job3", "team_name": "team-awesome", "fail_meta_job_on_error": True, + "arguments": {}, "depends_on": ["job1"] }, { "job_name": "job4", "team_name": "team-awesome", "fail_meta_job_on_error": True, + "arguments": {}, "depends_on": ["job1"] }, @@ -91,12 +97,14 @@ JOBS_RUN_ORDER = [ "job_name": "job5", "team_name": "team-awesome", "fail_meta_job_on_error": True, + "arguments": {}, "depends_on": ["job3"] }, { "job_name": "job6", "team_name": "team-awesome", "fail_meta_job_on_error": True, + "arguments": {}, "depends_on": ["job3"] }, ] diff --git a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/api/meta_job.py b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/api/meta_job.py index 52ac940539..516e9dab56 100644 --- a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/api/meta_job.py +++ b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/api/meta_job.py @@ -16,6 +16,7 @@ class SingleJob: job_name: str team_name: str = None fail_meta_job_on_error: bool = True + arguments: dict = None depends_on: List[str] = field(default_factory=list) diff --git a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/cached_data_job_executor.py b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/cached_data_job_executor.py index c853a8571c..a79a6169b5 100644 --- a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/cached_data_job_executor.py +++ b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/cached_data_job_executor.py @@ -2,12 +2,12 @@ # SPDX-License-Identifier: Apache-2.0 import json import logging -import os import time from typing import Dict from typing import Optional import urllib3.exceptions as url_exception +from vdk.api.job_input import IJobArguments from vdk.internal.core.errors import ErrorMessage from vdk.internal.core.errors import UserCodeError from vdk.plugin.meta_jobs.meta import IDataJobExecutor @@ -44,7 +44,7 @@ def start_job(self, job_name: str) -> None: job = self.__get_job(job_name) job.start_attempt += 1 execution_id = self.start_new_job_execution( - job_name=job.job_name, team_name=job.team_name + job_name=job.job_name, team_name=job.team_name, arguments=job.arguments ) log.info(f"Starting new data job execution with id {execution_id}") job.execution_id = execution_id @@ -135,7 +135,9 @@ def get_all_jobs(self): def get_currently_running_jobs(self): return [j for j in self._jobs_cache.values() if j.status in ACTIVE_JOB_STATUSES] - def start_new_job_execution(self, job_name: str, team_name: str) -> str: + def start_new_job_execution( + self, job_name: str, team_name: str, arguments: IJobArguments = None + ) -> str: """ Start a new data job execution. The stages of the process are: @@ -152,6 +154,7 @@ def start_new_job_execution(self, job_name: str, team_name: str) -> str: :param job_name: name of the data job to be executed :param team_name: name of the owning team + :param arguments: arguments of the data job :return: id of the started job execution """ current_retries = 0 @@ -163,7 +166,7 @@ def start_new_job_execution(self, job_name: str, team_name: str) -> str: while current_retries < ALLOWED_RETRIES: try: - execution_id = self._executor.start_job(job_name, team_name) + execution_id = self._executor.start_job(job_name, team_name, arguments) return execution_id except url_exception.TimeoutError as e: log.info( diff --git a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/dag_validator.py b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/dag_validator.py index 19c8233631..34b14c8bb3 100644 --- a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/dag_validator.py +++ b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/dag_validator.py @@ -1,6 +1,7 @@ # Copyright 2023-2023 VMware, Inc. # SPDX-License-Identifier: Apache-2.0 import graphlib +import json import logging from collections import namedtuple from typing import Dict @@ -14,7 +15,13 @@ ERROR = Error( TYPE="type", PERMISSION="permission", REQUIREMENT="requirement", CONFLICT="conflict" ) -allowed_job_keys = {"job_name", "team_name", "fail_meta_job_on_error", "depends_on"} +allowed_job_keys = { + "job_name", + "team_name", + "fail_meta_job_on_error", + "depends_on", + "arguments", +} required_job_keys = {"job_name", "depends_on"} @@ -37,7 +44,7 @@ def validate(self, jobs: List[Dict]): log.info("Successfully validated the DAG!") def _raise_error( - self, jobs: List[Dict], error_type: str, reason: str, countermeasures: str + self, error_type: str, reason: str, countermeasures: str, jobs: List[str] = "" ): raise UserCodeError( ErrorMessage( @@ -53,35 +60,50 @@ def _validate_no_duplicates(self, jobs: List[Dict]): duplicated_jobs = list({job["job_name"] for job in jobs if jobs.count(job) > 1}) if duplicated_jobs: self._raise_error( - duplicated_jobs, ERROR.CONFLICT, f"There are some duplicated jobs: {duplicated_jobs}.", f"Remove the duplicated jobs from the list - each job can appear in the jobs list at most once. " f"Duplicated jobs: {duplicated_jobs}.", + duplicated_jobs, ) def _validate_job(self, job: Dict): + self._validate_job_type(job) self._validate_allowed_and_required_keys(job) self._validate_job_name(job) - self._validate_dependencies(job) - self._validate_team_name(job) - self._validate_fail_meta_job_on_error(job) - log.info(f"Successfully validated job: {job['job_name']}") + job_name = job.get("job_name") + self._validate_dependencies(job_name, job["depends_on"]) + if "team_name" in job: + self._validate_team_name(job_name, job["team_name"]) + if "fail_meta_job_on_error" in job: + self._validate_fail_meta_job_on_error( + job_name, job["fail_meta_job_on_error"] + ) + if "arguments" in job: + self._validate_arguments(job_name, job["arguments"]) + log.info(f"Successfully validated job: {job_name}") + + def _validate_job_type(self, job: Dict): + if not isinstance(job, dict): + self._raise_error( + ERROR.TYPE, + "The job type is not dict.", + f"Change the Data Job type. Current type is {type(job)}. Expected type is dict.", + ["".join(list(job))], + ) def _validate_allowed_and_required_keys(self, job: Dict): - forbidden_keys = [key for key in job.keys() if key not in allowed_job_keys] - if forbidden_keys: + disallowed_keys = [key for key in job.keys() if key not in allowed_job_keys] + if disallowed_keys: self._raise_error( - list(job), ERROR.PERMISSION, "One or more job dict keys are not allowed.", - f"Remove the forbidden Data Job Dict keys. " - f"Keys {forbidden_keys} are forbidden. Allowed keys: {allowed_job_keys}.", + f"Remove the disallowed Data Job Dict keys. " + f"Keys {disallowed_keys} are not allowed. Allowed keys: {allowed_job_keys}.", ) missing_keys = [key for key in required_job_keys if key not in job] if missing_keys: self._raise_error( - list(job), ERROR.REQUIREMENT, "One or more job dict required keys are missing.", f"Add the missing required Data Job Dict keys. Keys {missing_keys} " @@ -91,54 +113,74 @@ def _validate_allowed_and_required_keys(self, job: Dict): def _validate_job_name(self, job: Dict): if not isinstance(job["job_name"], str): self._raise_error( - list(job), ERROR.TYPE, "The type of the job dict key job_name is not string.", f"Change the Data Job Dict value of job_name. " f"Current type is {type(job['job_name'])}. Expected type is string.", + ["".join(list(job))], ) - def _validate_dependencies(self, job: Dict): - if not (isinstance(job["depends_on"], List)): + def _validate_dependencies(self, job_name: str, dependencies: List[str]): + if not (isinstance(dependencies, List)): self._raise_error( - list(job), ERROR.TYPE, "The type of the job dict depends_on key is not list.", f"Check the Data Job Dict type of the depends_on key. Current type " - f"is {type(job['depends_on'])}. Expected type is list.", + f"is {type(dependencies)}. Expected type is list.", + [job_name], ) non_string_dependencies = [ - pred for pred in job["depends_on"] if not isinstance(pred, str) + pred for pred in dependencies if not isinstance(pred, str) ] if non_string_dependencies: self._raise_error( - list(job), ERROR.TYPE, "One or more items of the job dependencies list are not strings.", f"Check the Data Job Dict values of the depends_on list. " f"There are some non-string values: {non_string_dependencies}. Expected type is string.", + [job_name], ) - def _validate_team_name(self, job: Dict): - if "team_name" in job and not isinstance(job["team_name"], str): + def _validate_team_name(self, job_name: str, team_name: str): + if not isinstance(team_name, str): self._raise_error( - list(job), ERROR.TYPE, "The type of the job dict key job_name is not string.", f"Change the Data Job Dict value of team_name. " - f"Current type is {type(job['team_name'])}. Expected type is string.", + f"Current type is {type(team_name)}. Expected type is string.", + [job_name], ) - def _validate_fail_meta_job_on_error(self, job: Dict): - if "fail_meta_job_on_error" in job and not isinstance( - (job["fail_meta_job_on_error"]), bool - ): + def _validate_fail_meta_job_on_error( + self, job_name: str, fail_meta_job_on_error: bool + ): + if not isinstance(fail_meta_job_on_error, bool): self._raise_error( - list(job), ERROR.TYPE, "The type of the job dict key fail_meta_job_on_error is not bool (True/False).", f"Change the Data Job Dict value of fail_meta_job_on_error. Current type" - f" is {type(job['fail_meta_job_on_error'])}. Expected type is bool.", + f" is {type(fail_meta_job_on_error)}. Expected type is bool.", + [job_name], + ) + + def _validate_arguments(self, job_name: str, job_args: dict): + if not isinstance(job_args, dict): + self._raise_error( + ERROR.TYPE, + "The type of the job dict key arguments is not dict.", + f"Change the Data Job Dict value of arguments. " + f"Current type is {type(job_args)}. Expected type is dict.", + [job_name], + ) + try: + json.dumps(job_args) + except TypeError as e: + self._raise_error( + ERROR.TYPE, + str(e), + f"Change the Data Job Dict value of arguments. " + f"Current type is {type(job_args)} but not serializable as JSON.", + [job_name], ) def _check_dag_cycles(self, jobs: List[Dict]): @@ -151,8 +193,8 @@ def _check_dag_cycles(self, jobs: List[Dict]): topological_sorter.prepare() except graphlib.CycleError as e: self._raise_error( - e.args[1][:-1], ERROR.CONFLICT, "There is a cycle in the DAG.", f"Change the depends_on list of the jobs that participate in the detected cycle: {e.args[1]}.", + e.args[1][:-1], ) diff --git a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta.py b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta.py index a9216970a7..4a61ef2fc8 100644 --- a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta.py +++ b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta.py @@ -6,14 +6,16 @@ from typing import Optional from taurus_datajob_api import DataJobExecution +from vdk.api.job_input import IJobArguments from vdk.plugin.meta_jobs.api import meta_job class IDataJobExecutor(abc.ABC): @abc.abstractmethod - def start_job(self, job_name: str, team_name: str): + def start_job(self, job_name: str, team_name: str, arguments: IJobArguments = None): """ Start an execution of a data job and returns its execution id + :param arguments: :param job_name: :param team_name: :return: execution id diff --git a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_dag.py b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_dag.py index 4732895c99..c0d7e700a6 100644 --- a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_dag.py +++ b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_dag.py @@ -22,7 +22,11 @@ class MetaJobsDag: - def __init__(self, team_name: str, meta_config: MetaPluginConfiguration): + def __init__( + self, + team_name: str, + meta_config: MetaPluginConfiguration, + ): self._team_name = team_name self._topological_sorter = TopologicalSorter() self._delayed_starting_jobs = TimeBasedQueue( @@ -49,6 +53,7 @@ def build_dag(self, jobs: List[Dict]): job["job_name"], job.get("team_name", self._team_name), job.get("fail_meta_job_on_error", True), + job.get("arguments", None), ) self._job_executor.register_job(trackable_job) self._topological_sorter.add(trackable_job.job_name, *job["depends_on"]) diff --git a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/remote_data_job.py b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/remote_data_job.py index 98f6f6e46e..4338de4524 100644 --- a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/remote_data_job.py +++ b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/remote_data_job.py @@ -14,6 +14,7 @@ from taurus_datajob_api import DataJobExecutionRequest from taurus_datajob_api import DataJobsExecutionApi from urllib3 import Retry +from vdk.api.job_input import IJobArguments from vdk.plugin.control_api_auth.authentication import Authentication log = logging.getLogger(__name__) @@ -49,12 +50,14 @@ def __init__( job_name: str, team_name: str, rest_api_url: str, + arguments: IJobArguments = None, timeout: int = 5, # TODO: Set reasonable default **kwargs, ) -> None: - self.job_name = job_name - self.team_name = team_name - self._rest_api_url = rest_api_url + self.__job_name = job_name + self.__team_name = team_name + self.__rest_api_url = rest_api_url + self.__arguments = arguments self.timeout = timeout self.deployment_id = "production" # currently multiple deployments are not supported so this remains hardcoded self.auth: Optional[Authentication] = kwargs.pop("auth", None) @@ -78,7 +81,7 @@ def __init__( self.__execution_api = self._get_execution_api() - def start_job_execution(self, **request_kwargs) -> str: + def start_job_execution(self) -> str: """ Triggers a manual Datajob execution. @@ -86,11 +89,11 @@ def start_job_execution(self, **request_kwargs) -> str: """ execution_request = DataJobExecutionRequest( started_by="meta-job", # TODO: specify name of meta job - args=request_kwargs, + args=self.__arguments, ) _, _, headers = self.__execution_api.data_job_execution_start_with_http_info( - team_name=self.team_name, - job_name=self.job_name, + team_name=self.__team_name, + job_name=self.__job_name, deployment_id=self.deployment_id, data_job_execution_request=execution_request, _request_timeout=self.timeout, @@ -107,8 +110,8 @@ def cancel_job_execution(self, execution_id: str) -> None: :param execution_id: ID of the job execution """ self.__execution_api.data_job_execution_cancel( - team_name=self.team_name, - job_name=self.job_name, + team_name=self.__team_name, + job_name=self.__job_name, execution_id=execution_id, _request_timeout=self.timeout, ) @@ -121,7 +124,9 @@ def get_job_execution_log(self, execution_id: str) -> str: :return: job execution logs """ return self.__execution_api.data_job_logs_download( - team_name=self.team_name, job_name=self.job_name, execution_id=execution_id + team_name=self.__team_name, + job_name=self.__job_name, + execution_id=execution_id, ).logs def get_job_execution_status(self, execution_id: str) -> DataJobExecution: @@ -132,7 +137,9 @@ def get_job_execution_status(self, execution_id: str) -> DataJobExecution: :return: The execution status object listing details about the status of this particular execution """ job_execution: DataJobExecution = self.__execution_api.data_job_execution_read( - team_name=self.team_name, job_name=self.job_name, execution_id=execution_id + team_name=self.__team_name, + job_name=self.__job_name, + execution_id=execution_id, ) return job_execution @@ -144,7 +151,7 @@ def get_job_executions(self) -> Optional[List[DataJobExecution]]: :return: A list of DataJobExecution objects for the available executions. """ job_execution_list = self.__execution_api.data_job_execution_list( - team_name=self.team_name, job_name=self.job_name + team_name=self.__team_name, job_name=self.__job_name ) return job_execution_list @@ -200,7 +207,7 @@ def wait_for_job( return job_status def _get_execution_api(self): - rest_api_url = self._rest_api_url + rest_api_url = self.__rest_api_url config = Configuration(host=rest_api_url, api_key=None) config.connection_pool_maxsize = self.http_connection_pool_maxsize diff --git a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/remote_data_job_executor.py b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/remote_data_job_executor.py index 7c5813867c..c2ab3dc675 100644 --- a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/remote_data_job_executor.py +++ b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/remote_data_job_executor.py @@ -4,15 +4,18 @@ from typing import Optional from taurus_datajob_api import DataJobExecution +from vdk.api.job_input import IJobArguments from vdk.internal.control.configuration.vdk_config import VDKConfig from vdk.plugin.meta_jobs.meta import IDataJobExecutor from vdk.plugin.meta_jobs.remote_data_job import RemoteDataJob class RemoteDataJobExecutor(IDataJobExecutor): - def start_job(self, job_name: str, team_name: str): + def start_job(self, job_name: str, team_name: str, arguments: IJobArguments = None): vdk_cfg = VDKConfig() - job = RemoteDataJob(job_name, team_name, vdk_cfg.control_service_rest_api_url) + job = RemoteDataJob( + job_name, team_name, vdk_cfg.control_service_rest_api_url, arguments + ) return job.start_job_execution() # catch error on 409 diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/dag-arguments/config.ini b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/dag-arguments/config.ini new file mode 100644 index 0000000000..3b1eb64c8a --- /dev/null +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/dag-arguments/config.ini @@ -0,0 +1,2 @@ +[owner] +team = team-awesome diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/dag-arguments/dag.py b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/dag-arguments/dag.py new file mode 100644 index 0000000000..af89479d68 --- /dev/null +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/dag-arguments/dag.py @@ -0,0 +1,23 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import logging + +from vdk.api.job_input import IJobInput +from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput + +log = logging.getLogger(__name__) + + +def run(job_input: IJobInput): + log.info(f"Dummy arguments {job_input.get_arguments()}") + + job1 = dict(job_name="job1", depends_on=[]) + job2 = dict( + job_name="job2", + depends_on=["job1"], + fail_meta_job_on_error=False, + arguments={"table_name": "test_table", "counter": 123}, + ) + job3 = dict(job_name="job3", depends_on=["job1"]) + job4 = dict(job_name="job4", depends_on=["job2", "job3"]) + MetaJobInput().run_meta_job([job1, job2, job3, job4]) diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/dag-empty-arguments/config.ini b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/dag-empty-arguments/config.ini new file mode 100644 index 0000000000..3b1eb64c8a --- /dev/null +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/dag-empty-arguments/config.ini @@ -0,0 +1,2 @@ +[owner] +team = team-awesome diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/dag-empty-arguments/dag.py b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/dag-empty-arguments/dag.py new file mode 100644 index 0000000000..f36e03f338 --- /dev/null +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/dag-empty-arguments/dag.py @@ -0,0 +1,23 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import logging + +from vdk.api.job_input import IJobInput +from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput + +log = logging.getLogger(__name__) + + +def run(job_input: IJobInput): + log.info(f"Dummy arguments {job_input.get_arguments()}") + + job1 = dict(job_name="job1", depends_on=[]) + job2 = dict( + job_name="job2", + depends_on=["job1"], + fail_meta_job_on_error=False, + arguments={}, + ) + job3 = dict(job_name="job3", depends_on=["job1"]) + job4 = dict(job_name="job4", depends_on=["job2", "job3"]) + MetaJobInput().run_meta_job([job1, job2, job3, job4]) diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/dag-wrong-job-arguments-type/config.ini b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/dag-wrong-job-arguments-type/config.ini new file mode 100644 index 0000000000..3b1eb64c8a --- /dev/null +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/dag-wrong-job-arguments-type/config.ini @@ -0,0 +1,2 @@ +[owner] +team = team-awesome diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/dag-wrong-job-arguments-type/dag.py b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/dag-wrong-job-arguments-type/dag.py new file mode 100644 index 0000000000..ee7a22cf33 --- /dev/null +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/jobs/dag-wrong-job-arguments-type/dag.py @@ -0,0 +1,18 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import logging + +from vdk.api.job_input import IJobInput +from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput + +log = logging.getLogger(__name__) + + +def run(job_input: IJobInput): + log.info(f"Dummy arguments {job_input.get_arguments()}") + + job1 = dict(job_name="job1", depends_on=[]) + job2 = dict(job_name="job2", depends_on=["job1"], arguments=5) + job3 = dict(job_name="job3", depends_on=["job1"]) + job4 = dict(job_name="job4", depends_on=["job2", "job3"]) + MetaJobInput().run_meta_job([job1, job2, job3, job4]) diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py b/projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py index 8c1f151315..cb7c1a28f7 100644 --- a/projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py @@ -127,7 +127,10 @@ def _run_meta_job(self, meta_job_name): # CliEntryBasedTestRunner (provided by vdk-test-utils) gives a away to simulate vdk command # and mock large parts of it - e.g passed our own plugins result: Result = self.runner.invoke( - ["run", jobs_path_from_caller_directory(meta_job_name)] + [ + "run", + jobs_path_from_caller_directory(meta_job_name), + ] ) return result @@ -327,6 +330,36 @@ def test_meta_job_duplicate_jobs(self): def test_meta_job_not_allowed_job_key(self): self._test_meta_job_validation("meta-job-not-allowed-job-key") + def test_meta_job_wrong_job_arguments_type(self): + self._test_meta_job_validation("dag-wrong-job-arguments-type") + + def test_meta_job_arguments(self): + self._set_up() + with mock.patch.dict( + os.environ, + self.env_vars, + ): + self.runner = CliEntryBasedTestRunner(plugin_entry) + result = self._run_meta_job("dag-arguments") + cli_assert_equal(0, result) + job2_arguments = self._get_job_arguments("job2") + assert len(job2_arguments) == 2 + assert job2_arguments == {"table_name": "test_table", "counter": 123} + self.httpserver.stop() + + def test_meta_job_empty_arguments(self): + self._set_up() + with mock.patch.dict( + os.environ, + self.env_vars, + ): + self.runner = CliEntryBasedTestRunner(plugin_entry) + result = self._run_meta_job("dag-empty-arguments") + cli_assert_equal(0, result) + job2_arguments = self._get_job_arguments("job2") + assert len(job2_arguments) == 0 + self.httpserver.stop() + def test_meta_job_wrong_job_key_type(self): self._test_meta_job_validation("meta-job-wrong-job-key-type") @@ -335,3 +368,12 @@ def test_meta_job_wrong_job_type(self): def test_meta_job_wrong_topological_order(self): self._test_meta_job_validation("meta-job-wrong-topological-order") + + def _get_job_arguments(self, job_name: str): + job_post_req = [ + req + for req, res in self.httpserver.log + if req.method == "POST" + and req.path.split("/jobs/")[1].split("/")[0] == job_name + ] + return job_post_req[0].json["args"]