diff --git a/projects/vdk-plugins/airflow-provider-vdk/tests/hooks/test_vdkhook.py b/projects/vdk-plugins/airflow-provider-vdk/tests/hooks/test_vdkhook.py index 0aaa37c405..92fb6d94a2 100644 --- a/projects/vdk-plugins/airflow-provider-vdk/tests/hooks/test_vdkhook.py +++ b/projects/vdk-plugins/airflow-provider-vdk/tests/hooks/test_vdkhook.py @@ -20,6 +20,7 @@ def read_access_token(self) -> str: class DummyStatusResponse: status = 200 data = b"ddf" + headers = {"Location": "job-execution-id-01"} def getheader(self, str): return "json" @@ -39,7 +40,7 @@ def setUp(self): @mock.patch("taurus_datajob_api.api_client.ApiClient.call_api") def test_start_job_execution(self, mock_call_api): - mock_call_api.return_value = (None, None, {"Location": "job-execution-id-01"}) + mock_call_api.return_value = DummyStatusResponse() self.hook.start_job_execution() diff --git a/projects/vdk-plugins/airflow-provider-vdk/tests/operators/test_vdkoperator.py b/projects/vdk-plugins/airflow-provider-vdk/tests/operators/test_vdkoperator.py index 7a5f3b4776..cd8c2d7902 100644 --- a/projects/vdk-plugins/airflow-provider-vdk/tests/operators/test_vdkoperator.py +++ b/projects/vdk-plugins/airflow-provider-vdk/tests/operators/test_vdkoperator.py @@ -9,6 +9,10 @@ from vdk_provider.operators.vdk import VDKOperator +class DummyApiResponse: + headers = {"Location": "job-execution-id-01"} + + def call_api_return_func(*args, **kwargs): if ( args[0] @@ -21,7 +25,7 @@ def call_api_return_func(*args, **kwargs): "deployment_id": "production", } ): - return None, None, {"Location": "job-execution-id-01"} + return DummyApiResponse() elif ( args[0] == "/data-jobs/for-team/{team_name}/jobs/{job_name}/executions/{execution_id}" diff --git a/projects/vdk-plugins/airflow-provider-vdk/vdk_provider/hooks/vdk.py b/projects/vdk-plugins/airflow-provider-vdk/vdk_provider/hooks/vdk.py index b5cec0542b..728e24a48b 100644 --- a/projects/vdk-plugins/airflow-provider-vdk/vdk_provider/hooks/vdk.py +++ b/projects/vdk-plugins/airflow-provider-vdk/vdk_provider/hooks/vdk.py @@ -104,13 +104,13 @@ def start_job_execution(self, **request_kwargs) -> str: started_by="airflow-provider-vdk", args=request_kwargs, ) - _, _, headers = self.__execution_api.data_job_execution_start_with_http_info( + headers = self.__execution_api.data_job_execution_start_with_http_info( team_name=self.team_name, job_name=self.job_name, deployment_id=self.deployment_id, data_job_execution_request=execution_request, _request_timeout=self.timeout, - ) + ).headers log.debug(f"Received headers: {headers}") job_execution_id = os.path.basename(headers["Location"]) diff --git a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/cached_data_job_executor.py b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/cached_data_job_executor.py index d76fc1c64d..3619dc3515 100644 --- a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/cached_data_job_executor.py +++ b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/cached_data_job_executor.py @@ -62,7 +62,10 @@ def start_job(self, job_name: str) -> None: job = self.__get_job(job_name) job.start_attempt += 1 execution_id = self.start_new_job_execution( - job_name=job.job_name, team_name=job.team_name, arguments=job.arguments + job_name=job.job_name, + team_name=job.team_name, + started_by=job.details.get("started_by"), + arguments=job.arguments, ) log.info(f"Starting new data job execution with id {execution_id}") job.execution_id = execution_id @@ -138,6 +141,20 @@ def status(self, job_name: str) -> str: log.debug(f"Job status: {job}") return job.status + def execution_type(self, job_name: str, team_name: str, execution_id: str) -> str: + """ + Gets the execution type of a job. + + :param execution_id: the execution id of the job + :param team_name: the name of the owning team + :param job_name: the name of the job + :return: the job execution type (manual/scheduled) + """ + details = self._executor.details_job(job_name, team_name, execution_id) + log.debug(f"Job execution type: {details.get('type')}") + # the default value for execution type is manual + return details.get("type", "manual") + def get_finished_job_names(self): """ :return: list of the names of all the finalized jobs @@ -175,7 +192,11 @@ def get_currently_running_jobs(self): return [j for j in self._jobs_cache.values() if j.status in ACTIVE_JOB_STATUSES] def start_new_job_execution( - self, job_name: str, team_name: str, arguments: IJobArguments = None + self, + job_name: str, + team_name: str, + started_by: str = None, + arguments: IJobArguments = None, ) -> str: """ Start a new data job execution. @@ -193,6 +214,7 @@ def start_new_job_execution( :param job_name: name of the data job to be executed :param team_name: name of the owning team + :param started_by: the execution type and the name of the DAG job :param arguments: arguments of the data job :return: id of the started job execution """ @@ -205,7 +227,9 @@ def start_new_job_execution( while current_retries < ALLOWED_RETRIES: try: - execution_id = self._executor.start_job(job_name, team_name, arguments) + execution_id = self._executor.start_job( + job_name, team_name, started_by, arguments + ) return execution_id except url_exception.TimeoutError as e: log.info( diff --git a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag.py b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag.py index 540dc4a647..6f4a99af89 100644 --- a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag.py +++ b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag.py @@ -22,7 +22,13 @@ class Dag: - def __init__(self, team_name: str, dags_config: DagPluginConfiguration): + def __init__( + self, + team_name: str, + dags_config: DagPluginConfiguration, + job_name: str = None, + execution_id: str = None, + ): """ This module deals with all the DAG-related operations such as build and execute. @@ -47,6 +53,32 @@ def __init__(self, team_name: str, dags_config: DagPluginConfiguration): time_between_status_check_seconds=dags_config.dags_time_between_status_check_seconds(), ) self._dag_validator = DagValidator() + if job_name is not None: + if execution_id is not None: + try: + self._started_by = ( + self._job_executor.execution_type( + job_name, team_name, execution_id + ) + + "/" + + job_name + ) + except ApiException as e: + if e.status == 404: + log.debug( + f"Job {job_name} of team {team_name} with execution id {execution_id} failed. " + f"Local job runs return 404 status when getting the execution type: {e}" + ) + else: + log.info( + f"Unexpected error while checking for job execution type for job {job_name} " + f"with execution id {execution_id} of team {team_name}: {e}" + ) + self._started_by = f"manual/{job_name}" + else: + self._started_by = f"manual/{job_name}" + else: + self._started_by = "manual/default" def build_dag(self, jobs: List[Dict]): """ @@ -62,7 +94,9 @@ def build_dag(self, jobs: List[Dict]): job.get("team_name", self._team_name), job.get("fail_dag_on_error", True), job.get("arguments", None), + job.get("details", {}), ) + trackable_job.details = {"started_by": self._started_by} self._job_executor.register_job(trackable_job) self._topological_sorter.add(trackable_job.job_name, *job["depends_on"]) diff --git a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_plugin.py b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_plugin.py index e5ad1ded26..de4e573749 100644 --- a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_plugin.py +++ b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_plugin.py @@ -7,6 +7,7 @@ from vdk.internal.builtin_plugins.config.job_config import JobConfigKeys from vdk.internal.builtin_plugins.run.job_context import JobContext from vdk.internal.core.config import ConfigurationBuilder +from vdk.internal.core.statestore import CommonStoreKeys from vdk.plugin.dag import dag_runner from vdk.plugin.dag.dag_plugin_configuration import add_definitions from vdk.plugin.dag.dag_plugin_configuration import DagPluginConfiguration @@ -23,6 +24,10 @@ def run_job(context: JobContext) -> None: dag_runner.DAG_CONFIG = DagPluginConfiguration( context.core_context.configuration ) + dag_runner.JOB_NAME = context.name + dag_runner.EXECUTION_ID = context.core_context.state.get( + CommonStoreKeys.EXECUTION_ID + ) @staticmethod @hookimpl diff --git a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_runner.py b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_runner.py index 4f029601af..33e6ffd017 100644 --- a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_runner.py +++ b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_runner.py @@ -11,6 +11,8 @@ TEAM_NAME: Optional[str] = None DAG_CONFIG = None +JOB_NAME: Optional[str] = None +EXECUTION_ID: Optional[str] = None log = logging.getLogger(__name__) @@ -31,7 +33,7 @@ def run_dag(self, jobs: List[Dict]): :param jobs: the list of jobs that are part of the DAG :return: """ - dag = Dag(TEAM_NAME, DAG_CONFIG) + dag = Dag(TEAM_NAME, DAG_CONFIG, JOB_NAME, EXECUTION_ID) dag.build_dag(jobs) dag.execute_dag() log.info(f"DAG summary:\n{dag}") diff --git a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dags.py b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dags.py index c136568f36..1f92b4a1fa 100644 --- a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dags.py +++ b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dags.py @@ -16,9 +16,16 @@ class IDataJobExecutor(abc.ABC): """ @abc.abstractmethod - def start_job(self, job_name: str, team_name: str, arguments: IJobArguments = None): + def start_job( + self, + job_name: str, + team_name: str, + started_by: str = None, + arguments: IJobArguments = None, + ): """ Start an execution of a data job and returns its execution id + :param started_by: :param arguments: :param job_name: :param team_name: @@ -40,11 +47,11 @@ def status_job(self, job_name: str, team_name: str, execution_id: str): @abc.abstractmethod def details_job(self, job_name: str, team_name: str, execution_id: str) -> dict: """ - Get the current status of a data job execution + Get the current details of a data job execution :param job_name: :param team_name: :param execution_id: - :return: status in string as defined by Control Service API + :return: details in string as defined by Control Service API """ pass diff --git a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/remote_data_job.py b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/remote_data_job.py index 024a23c769..7db062dd05 100644 --- a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/remote_data_job.py +++ b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/remote_data_job.py @@ -50,6 +50,7 @@ def __init__( job_name: str, team_name: str, rest_api_url: str, + started_by: str = None, arguments: IJobArguments = None, timeout: int = 5, # TODO: Set reasonable default **kwargs, @@ -67,6 +68,7 @@ def __init__( self.__team_name = team_name self.__rest_api_url = rest_api_url self.__arguments = arguments + self.__started_by = started_by self.timeout = timeout self.deployment_id = "production" # currently multiple deployments are not supported so this remains hardcoded self.auth: Optional[Authentication] = kwargs.pop("auth", None) @@ -92,21 +94,21 @@ def __init__( def start_job_execution(self) -> str: """ - Triggers a manual Datajob execution. + Triggers a Datajob execution. :param: request_kwargs: Request arguments to be included with the HTTP request """ execution_request = DataJobExecutionRequest( - started_by="dag", # TODO: specify name of dag + started_by=self.__started_by, args=self.__arguments, ) - _, _, headers = self.__execution_api.data_job_execution_start_with_http_info( + headers = self.__execution_api.data_job_execution_start_with_http_info( team_name=self.__team_name, job_name=self.__job_name, deployment_id=self.deployment_id, data_job_execution_request=execution_request, _request_timeout=self.timeout, - ) + ).headers log.debug(f"Received headers: {headers}") job_execution_id = os.path.basename(headers["Location"]) @@ -138,7 +140,7 @@ def get_job_execution_log(self, execution_id: str) -> str: execution_id=execution_id, ).logs - def get_job_execution_status(self, execution_id: str) -> DataJobExecution: + def get_job_execution_details(self, execution_id: str) -> DataJobExecution: """ Returns the execution status for a particular job execution. @@ -186,7 +188,7 @@ def wait_for_job( time.sleep(wait_seconds) try: - job_execution = self.get_job_execution_status(execution_id) + job_execution = self.get_job_execution_details(execution_id) job_status = job_execution.status except Exception as err: log.info("VDK Control Service returned error: %s", err) diff --git a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/remote_data_job_executor.py b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/remote_data_job_executor.py index 999c521109..520f8b2407 100644 --- a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/remote_data_job_executor.py +++ b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/remote_data_job_executor.py @@ -15,10 +15,20 @@ class RemoteDataJobExecutor(IDataJobExecutor): This module is responsible for executing remote Data Jobs. """ - def start_job(self, job_name: str, team_name: str, arguments: IJobArguments = None): + def start_job( + self, + job_name: str, + team_name: str, + started_by: str = None, + arguments: IJobArguments = None, + ): vdk_cfg = VDKConfig() job = RemoteDataJob( - job_name, team_name, vdk_cfg.control_service_rest_api_url, arguments + job_name, + team_name, + vdk_cfg.control_service_rest_api_url, + started_by, + arguments, ) return job.start_job_execution() # catch error on 409 @@ -26,14 +36,14 @@ def start_job(self, job_name: str, team_name: str, arguments: IJobArguments = No def status_job(self, job_name: str, team_name: str, execution_id: str) -> str: vdk_cfg = VDKConfig() job = RemoteDataJob(job_name, team_name, vdk_cfg.control_service_rest_api_url) - status = job.get_job_execution_status(execution_id) - return status.status + details = job.get_job_execution_details(execution_id) + return details.status def details_job(self, job_name: str, team_name: str, execution_id: str) -> dict: vdk_cfg = VDKConfig() job = RemoteDataJob(job_name, team_name, vdk_cfg.control_service_rest_api_url) - status = job.get_job_execution_status(execution_id) - return status.to_dict() + details = job.get_job_execution_details(execution_id) + return details.to_dict() def job_executions_list( self, job_name: str, team_name: str diff --git a/projects/vdk-plugins/vdk-dag/tests/test_dag.py b/projects/vdk-plugins/vdk-dag/tests/test_dag.py index 5bf29b3354..afa354c02a 100644 --- a/projects/vdk-plugins/vdk-dag/tests/test_dag.py +++ b/projects/vdk-plugins/vdk-dag/tests/test_dag.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 import json import os +import re import time from datetime import date from datetime import datetime @@ -57,7 +58,7 @@ def dags_max_concurrent_running_jobs(self): class TestDAG: - def _prepare(self): + def _prepare(self, dag_name=None): rest_api_url = self.httpserver.url_for("") team_name = "team-awesome" if self.jobs is None: @@ -102,6 +103,7 @@ def _handler_fn(r: Request): start_time="2021-09-24T14:14:03.922Z", status=actual_job_status, message="foo", + started_by="manual/" + dag_name, ) response_data = json.dumps( execution.to_dict(), indent=4, default=json_serial @@ -122,6 +124,45 @@ def _handler_fn(r: Request): exec_handler(job_name, job_status, execution_duration) ) + class MatchExecutionID: + def __eq__(self, other): + return ( + re.match( + r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}-\d+$", + other.split("/")[-1], + ) + is not None + ) + + def exec_id_handler(job_name): + def _handler_fn(r: Request): + execution: DataJobExecution = DataJobExecution( + id=job_name, + job_name=job_name, + logs_url="http://url", + deployment=DataJobDeployment(), + start_time="2021-09-24T14:14:03.922Z", + status="succeeded", + message="foo", + started_by="manual/" + dag_name, + ) + response_data = json.dumps( + execution.to_dict(), indent=4, default=json_serial + ) + return Response( + response_data, + status=200, + headers=None, + content_type="application/json", + ) + + return _handler_fn + + self.httpserver.expect_request( + uri=MatchExecutionID(), + method="GET", + ).respond_with_handler(exec_id_handler(job_name)) + def exec_list_handler(job_name): def _handler_fn(r: Request): execution: DataJobExecution = DataJobExecution( @@ -132,6 +173,7 @@ def _handler_fn(r: Request): start_time="2021-09-24T14:14:03.922Z", status="succeeded", message="foo", + started_by="manual/" + dag_name, ) response_data = json.dumps( [execution.to_dict()], indent=4, default=json_serial @@ -152,11 +194,11 @@ def _handler_fn(r: Request): return rest_api_url - def _set_up(self, jobs=None, additional_env_vars=None): + def _set_up(self, jobs=None, additional_env_vars=None, dag_name=None): self.httpserver = PluginHTTPServer() self.httpserver.start() self.jobs = jobs - self.api_url = self._prepare() + self.api_url = self._prepare(dag_name) self.env_vars = {"VDK_CONTROL_SERVICE_REST_API_URL": self.api_url} if additional_env_vars is not None: self.env_vars.update(additional_env_vars) @@ -188,13 +230,14 @@ def _assert_dag_fails_with_error(self, result, error): assert len(self.httpserver.log) == 0 def test_dag(self): - self._set_up() + dag = "dag" + self._set_up(dag_name=dag) with mock.patch.dict( os.environ, self.env_vars, ): self.runner = CliEntryBasedTestRunner(dag_plugin) - result = self._run_dag("dag") + result = self._run_dag(dag) cli_assert_equal(0, result) self.httpserver.stop() @@ -205,13 +248,14 @@ def test_dag_error(self): ("job3", [200], "platform_error"), ("job4", [200], "succeeded"), ] - self._set_up(jobs) + dag = "dag" + self._set_up(jobs, dag_name=dag) with mock.patch.dict( os.environ, self.env_vars, ): self.runner = CliEntryBasedTestRunner(dag_plugin) - result = self._run_dag("dag") + result = self._run_dag(dag) cli_assert_equal(1, result) self.httpserver.stop() @@ -222,13 +266,14 @@ def test_dag_fail_false(self): ("job3", [200], "succeeded"), ("job4", [200], "succeeded"), ] - self._set_up(jobs) + dag = "dag" + self._set_up(jobs, dag_name=dag) with mock.patch.dict( os.environ, self.env_vars, ): self.runner = CliEntryBasedTestRunner(dag_plugin) - result = self._run_dag("dag") + result = self._run_dag(dag) cli_assert_equal(0, result) self.httpserver.stop() @@ -239,17 +284,18 @@ def test_dag_conflict(self): ("job3", [200], "succeeded"), ("job4", [200], "succeeded"), ] + dag = "dag" env_vars = { "VDK_DAGS_DELAYED_JOBS_RANDOMIZED_ADDED_DELAY_SECONDS": "0", "VDK_DAGS_DELAYED_JOBS_MIN_DELAY_SECONDS": "0", } - self._set_up(jobs, env_vars) + self._set_up(jobs, env_vars, dag) with mock.patch.dict( os.environ, self.env_vars, ): self.runner = CliEntryBasedTestRunner(dag_plugin) - result = self._run_dag("dag") + result = self._run_dag(dag) cli_assert_equal(0, result) self.httpserver.stop() @@ -260,18 +306,20 @@ def test_dag_cannot_start_job(self): ("job3", [200], "succeeded"), ("job4", [200], "succeeded"), ] - self._set_up(jobs) + dag = "dag" + self._set_up(jobs, dag_name=dag) with mock.patch.dict( os.environ, self.env_vars, ): self.runner = CliEntryBasedTestRunner(dag_plugin) - result = self._run_dag("dag") + result = self._run_dag(dag) cli_assert_equal(1, result) - # we should have 2 requests in the log, one to get a list - # of all executions, and one for the failing data job - # no other request should be tried as the DAG fails - assert len(self.httpserver.log) == 2 + # we should have 3 requests in the log, one to get + # the execution type of the dag,a list of all + # executions and one for the failing data job no + # other request should be tried as the DAG fails + assert len(self.httpserver.log) == 3 self.httpserver.stop() def test_dag_long_running(self): @@ -281,17 +329,18 @@ def test_dag_long_running(self): ("job3", [200], "succeeded"), ("job4", [200], "succeeded"), ] + dag = "dag" # we set 5 seconds more than execution duration of 3 set above dummy_config.dags_time_between_status_check_seconds_value = 5 dummy_config.dags_dag_execution_check_time_period_seconds_value = 0 - self._set_up(jobs, []) + self._set_up(jobs, dag_name=dag) with mock.patch.dict( os.environ, self.env_vars, ): self.runner = CliEntryBasedTestRunner(dag_plugin) - result = self._run_dag("dag") + result = self._run_dag(dag) cli_assert_equal(0, result) job1_requests = [ req @@ -308,25 +357,32 @@ def test_dag_long_running(self): # if implementation is changed the number below would likely change. # If the new count is not that big we can edit it here to pass the test, # if the new count is too big, we have an issue that need to be investigated. - assert len(self.httpserver.log) == 21 + assert len(self.httpserver.log) == 22 self.httpserver.stop() - """ def test_dag_concurrent_running_jobs_limit(self): jobs = [("job" + str(i), [200], "succeeded", 1) for i in range(1, 8)] + dag = "dag-exceed-limit" dummy_config.dags_max_concurrent_running_jobs_value = 2 dummy_config.dags_delayed_jobs_min_delay_seconds_value = 1 dummy_config.dags_delayed_jobs_randomized_added_delay_seconds_value = 1 dummy_config.dags_time_between_status_check_seconds_value = 1 - self._set_up(jobs, []) + env_vars = { + "VDK_DAGS_MAX_CONCURRENT_RUNNING_JOBS": "2", + "VDK_DAGS_DELAYED_JOBS_RANDOMIZED_ADDED_DELAY_SECONDS": "1", + "VDK_DAGS_DELAYED_JOBS_MIN_DELAY_SECONDS": "1", + "VDK_DAGS_TIME_BETWEEN_STATUS_CHECK_SECONDS_VALUE": "1", + } + + self._set_up(jobs, env_vars, dag) with mock.patch.dict( os.environ, self.env_vars, ): self.runner = CliEntryBasedTestRunner(dag_plugin) - result = self._run_dag("dag-exceed-limit") + result = self._run_dag(dag) expected_max_running_jobs = int( os.getenv("VDK_DAGS_MAX_CONCURRENT_RUNNING_JOBS", "2") ) @@ -350,7 +406,25 @@ def test_dag_concurrent_running_jobs_limit(self): # assert that all the jobs finished successfully assert len(running_jobs) == 0 self.httpserver.stop() - """ + + def test_dag_execution_type_propagation(self): + dag = "dag" + self._set_up(dag_name=dag) + with mock.patch.dict( + os.environ, + self.env_vars, + ): + self.runner = CliEntryBasedTestRunner(dag_plugin) + result = self._run_dag(dag) + for request, response in self.httpserver.log: + if "executions" in request.path: + if request.method == "GET": + execution = json.loads(response.response[0]) + if isinstance(execution, list): + execution = execution[0] + assert execution["started_by"] == "manual/" + dag + cli_assert_equal(0, result) + self.httpserver.stop() def _test_dag_validation(self, dag_name): self._set_up() @@ -380,13 +454,14 @@ def test_dag_wrong_job_arguments_type(self): self._test_dag_validation("dag-wrong-job-arguments-type") def test_dag_arguments(self): - self._set_up() + dag = "dag-arguments" + self._set_up(dag_name=dag) with mock.patch.dict( os.environ, self.env_vars, ): self.runner = CliEntryBasedTestRunner(dag_plugin) - result = self._run_dag("dag-arguments") + result = self._run_dag(dag) cli_assert_equal(0, result) job2_arguments = self._get_job_arguments("job2") assert len(job2_arguments) == 2 @@ -394,13 +469,14 @@ def test_dag_arguments(self): self.httpserver.stop() def test_dag_empty_arguments(self): - self._set_up() + dag = "dag-empty-arguments" + self._set_up(dag_name=dag) with mock.patch.dict( os.environ, self.env_vars, ): self.runner = CliEntryBasedTestRunner(dag_plugin) - result = self._run_dag("dag-empty-arguments") + result = self._run_dag(dag) cli_assert_equal(0, result) job2_arguments = self._get_job_arguments("job2") assert len(job2_arguments) == 0