diff --git a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/cached_data_job_executor.py b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/cached_data_job_executor.py index 4438827107..69c82fb266 100644 --- a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/cached_data_job_executor.py +++ b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/cached_data_job_executor.py @@ -12,6 +12,7 @@ from vdk.internal.core.errors import UserCodeError from vdk.plugin.dag.dags import IDataJobExecutor from vdk.plugin.dag.dags import TrackableJob +from vdk.plugin.dag.exception import DagJobExecutionException from vdk.plugin.dag.remote_data_job import JobStatus log = logging.getLogger(__name__) @@ -91,22 +92,12 @@ def finalize_job(self, job_name): job.details = details log.info( f"Finished data job {job_name}:\n" - f"start_time: {details['start_time']}\n" - f"end_time: {details.get('end_time')}\n" - f"status: {details['status']}\n" - f"message: {details['message']}" + f" start_time: {details['start_time']}\n" + f" end_time: {details.get('end_time')}\n" + f" status: {details['status']}\n" ) if job.status != JobStatus.SUCCEEDED.value and job.fail_dag_on_error: - raise UserCodeError( - ErrorMessage( - "", - "DAG failed due to a Data Job failure.", - f"Data Job {job_name} failed. See details: {details}", - "The rest of the jobs in the DAG will not be started " - "and the DAG will fail.", - "Investigate the error in the job or re-try again.", - ) - ) + raise DagJobExecutionException(job_name, details) @staticmethod def __get_printable_details(details): diff --git a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_validator.py b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_validator.py index 641ca0bf91..64b188b63c 100644 --- a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_validator.py +++ b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_validator.py @@ -7,8 +7,7 @@ from typing import Dict from typing import List -from vdk.internal.core.errors import ErrorMessage -from vdk.internal.core.errors import UserCodeError +from vdk.plugin.dag.exception import DagValidationException log = logging.getLogger(__name__) Error = namedtuple("Error", ["TYPE", "PERMISSION", "REQUIREMENT", "CONFLICT"]) @@ -45,23 +44,10 @@ def validate(self, jobs: List[Dict]): self._check_dag_cycles(jobs) log.debug("Successfully validated the DAG!") - def _raise_error( - self, error_type: str, reason: str, countermeasures: str, jobs: List[str] = "" - ): - raise UserCodeError( - ErrorMessage( - "", - "DAG failed due to a Data Job validation failure.", - f"There is a {error_type} error with job(s) {jobs}. " + reason, - "The DAG will not be built and will fail.", - countermeasures, - ) - ) - def _validate_no_duplicates(self, jobs: List[Dict]): duplicated_jobs = list({job["job_name"] for job in jobs if jobs.count(job) > 1}) if duplicated_jobs: - self._raise_error( + raise DagValidationException( ERROR.CONFLICT, f"There are some duplicated jobs: {duplicated_jobs}.", f"Remove the duplicated jobs from the list - each job can appear in the jobs list at most once. " @@ -85,100 +71,111 @@ def _validate_job(self, job: Dict): def _validate_job_type(self, job: Dict): if not isinstance(job, dict): - self._raise_error( + jobs = ["".join(list(job))] + raise DagValidationException( ERROR.TYPE, "The job type is not dict.", f"Change the Data Job type. Current type is {type(job)}. Expected type is dict.", - ["".join(list(job))], + jobs, ) def _validate_allowed_and_required_keys(self, job: Dict): disallowed_keys = [key for key in job.keys() if key not in allowed_job_keys] if disallowed_keys: - self._raise_error( + raise DagValidationException( ERROR.PERMISSION, "One or more job dict keys are not allowed.", f"Remove the disallowed Data Job Dict keys. " f"Keys {disallowed_keys} are not allowed. Allowed keys: {allowed_job_keys}.", + None, ) missing_keys = [key for key in required_job_keys if key not in job] if missing_keys: - self._raise_error( + raise DagValidationException( ERROR.REQUIREMENT, "One or more job dict required keys are missing.", f"Add the missing required Data Job Dict keys. Keys {missing_keys} " f"are missing. Required keys: {required_job_keys}.", + None, ) def _validate_job_name(self, job: Dict): if not isinstance(job["job_name"], str): - self._raise_error( + jobs = ["".join(list(job))] + raise DagValidationException( ERROR.TYPE, "The type of the job dict key job_name is not string.", f"Change the Data Job Dict value of job_name. " f"Current type is {type(job['job_name'])}. Expected type is string.", - ["".join(list(job))], + jobs, ) def _validate_dependencies(self, job_name: str, dependencies: List[str]): if not (isinstance(dependencies, List)): - self._raise_error( + jobs = [job_name] + raise DagValidationException( ERROR.TYPE, "The type of the job dict depends_on key is not list.", f"Check the Data Job Dict type of the depends_on key. Current type " f"is {type(dependencies)}. Expected type is list.", - [job_name], + jobs, ) non_string_dependencies = [ pred for pred in dependencies if not isinstance(pred, str) ] if non_string_dependencies: - self._raise_error( + jobs1 = [job_name] + raise DagValidationException( ERROR.TYPE, "One or more items of the job dependencies list are not strings.", f"Check the Data Job Dict values of the depends_on list. " f"There are some non-string values: {non_string_dependencies}. Expected type is string.", - [job_name], + jobs1, ) def _validate_team_name(self, job_name: str, team_name: str): if not isinstance(team_name, str): - self._raise_error( + jobs = [job_name] + raise DagValidationException( ERROR.TYPE, "The type of the job dict key job_name is not string.", f"Change the Data Job Dict value of team_name. " f"Current type is {type(team_name)}. Expected type is string.", - [job_name], + jobs, ) def _validate_fail_dag_on_error(self, job_name: str, fail_dag_on_error: bool): if not isinstance(fail_dag_on_error, bool): - self._raise_error( + jobs = [job_name] + raise DagValidationException( ERROR.TYPE, "The type of the job dict key fail_dag_on_error is not bool (True/False).", f"Change the Data Job Dict value of fail_dag_on_error. Current type" f" is {type(fail_dag_on_error)}. Expected type is bool.", - [job_name], + jobs, ) def _validate_arguments(self, job_name: str, job_args: dict): if not isinstance(job_args, dict): - self._raise_error( + jobs = [job_name] + raise DagValidationException( ERROR.TYPE, "The type of the job dict key arguments is not dict.", f"Change the Data Job Dict value of arguments. " f"Current type is {type(job_args)}. Expected type is dict.", - [job_name], + jobs, ) try: json.dumps(job_args) except TypeError as e: - self._raise_error( + reason = str(e) + jobs1 = [job_name] + raise DagValidationException( ERROR.TYPE, - str(e), + reason, f"Change the Data Job Dict value of arguments. " f"Current type is {type(job_args)} but not serializable as JSON.", - [job_name], + jobs1, ) def _check_dag_cycles(self, jobs: List[Dict]): @@ -190,9 +187,10 @@ def _check_dag_cycles(self, jobs: List[Dict]): # Preparing the sorter raises CycleError if cycles exist topological_sorter.prepare() except graphlib.CycleError as e: - self._raise_error( + jobs1 = e.args[1][:-1] + raise DagValidationException( ERROR.CONFLICT, "There is a cycle in the DAG.", f"Change the depends_on list of the jobs that participate in the detected cycle: {e.args[1]}.", - e.args[1][:-1], + jobs1, ) diff --git a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/exception.py b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/exception.py new file mode 100644 index 0000000000..6034bb19b5 --- /dev/null +++ b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/exception.py @@ -0,0 +1,85 @@ +# Copyright 2023-2024 Broadcom +# SPDX-License-Identifier: Apache-2.0 +from typing import List +from typing import Optional + +from vdk.internal.core.error_classifiers import ResolvableBy +from vdk.internal.core.error_classifiers import ResolvableByActual +from vdk.internal.core.errors import BaseVdkError + + +class DagValidationException(BaseVdkError): + """ + Exception raised for errors during DAG data job validation. + + :param error_type: The type of error encountered. + :param reason: Explanation of why the error occurred. + :param countermeasures: Suggested actions to resolve the error. + :param jobs: List of jobs associated with the error, defaults to None. + """ + + def __init__( + self, error_type: str, reason: str, countermeasures: str, jobs: List[str] = None + ): + self.jobs = jobs if jobs is not None else [] + self.error_type = error_type + self.reason = reason + self.countermeasures = countermeasures + + jobs_formatted = ", ".join(self.jobs) if self.jobs else "N/A" + message = ( + f"DAG Validation Error:\n" + f" - Error Type: {self.error_type}\n" + f" - Affected Jobs: {jobs_formatted}\n" + f" - Reason: {self.reason}\n" + f" - Countermeasures: {self.countermeasures}" + ) + super().__init__(ResolvableByActual.USER, ResolvableBy.USER_ERROR, message) + + +class DagJobExecutionException(BaseVdkError): + """ + Exception raised when an execution of a job within a DAG fails. + + :param str job_name: The name of the job that failed. + :param dict details: Any details relevant to the failure, optional. + """ + + def __init__(self, job_name: str, details: Optional[dict] = None): + self.job_name = job_name + self.details = details if details is not None else {} + + details_formatted = self.format_details(self.details) + + message = ( + f"Failure in DAG execution - Job '{self.job_name}' failed.\n" + f" - Failed Job details:\n{details_formatted}" + ) + # regardless of the failed job resolvable type, the DAG job always fails with user error + # since the DAG itself didn't fail due to platform error. + # The failed job itself might be platform error in this case the platform would still be alerted. + # While the user is responsible for looking at the DAG itself. + super().__init__(ResolvableByActual.USER, ResolvableBy.USER_ERROR, message) + + @staticmethod + def format_details(details: dict) -> str: + if not details: + return "None" + + def format_dict(d, indent=0, indent_prefix=" ", initial_prefix=" "): + formatted_str = "" + current_indent = initial_prefix + indent_prefix * indent + for key, value in d.items(): + if not value: + continue + formatted_str += f"{current_indent}{key}: " + if isinstance(value, dict) and indent < 1: + # Print nested dictionaries only up to the 2nd level + formatted_str += "\n" + format_dict( + value, indent + 1, indent_prefix + ) + else: + formatted_str += f"{value}\n" + return formatted_str + + return format_dict(details) diff --git a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py index cc07b08028..e58f61c545 100644 --- a/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py +++ b/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py @@ -135,7 +135,8 @@ def _determine_status_without_summary(self, result: int) -> str: return JobStatus.SUCCEEDED.value def _update_message_with_summary(self, content: str): - self._message = {"summary": json.loads(content), "logs": self._log_file} + self._message = json.loads(content) + self._message["logs"] = self._log_file class LocalDataJobRunException(Exception): @@ -158,9 +159,10 @@ def __find_job_path(job_name: str): candidates = [ os.getcwd(), ] + # TODO: expose this using the plugin configuration mechanisms (which infers also from env. vars among others) candidates += [ part - for part in os.environ.get("DAG_LOCAL_RUN_JOB_PATH", "").split(":") + for part in os.environ.get("DAGS_LOCAL_RUN_JOB_PATH", "").split(":") if part ] diff --git a/projects/vdk-plugins/vdk-dag/tests/test_dag.py b/projects/vdk-plugins/vdk-dag/tests/test_dag.py index 3242105912..7d0d8639a1 100644 --- a/projects/vdk-plugins/vdk-dag/tests/test_dag.py +++ b/projects/vdk-plugins/vdk-dag/tests/test_dag.py @@ -15,6 +15,7 @@ from vdk.internal.core.errors import UserCodeError from vdk.plugin.dag import dag_plugin from vdk.plugin.dag import dag_runner +from vdk.plugin.dag.exception import DagValidationException from vdk.plugin.test_utils.util_funcs import cli_assert_equal from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner from vdk.plugin.test_utils.util_funcs import jobs_path_from_caller_directory @@ -436,7 +437,7 @@ def _test_dag_validation(self, dag_name): self.runner = CliEntryBasedTestRunner(dag_plugin) result = self._run_dag(dag_name) cli_assert_equal(1, result) - self._assert_dag_fails_with_error(result, UserCodeError) + self._assert_dag_fails_with_error(result, DagValidationException) self.httpserver.stop() def test_dag_circular_dependency(self): diff --git a/projects/vdk-plugins/vdk-dag/tests/test_local_executor.py b/projects/vdk-plugins/vdk-dag/tests/test_local_executor.py index 2ede95747f..b00ed1c597 100644 --- a/projects/vdk-plugins/vdk-dag/tests/test_local_executor.py +++ b/projects/vdk-plugins/vdk-dag/tests/test_local_executor.py @@ -23,7 +23,7 @@ def reduce_retries_in_test_http_requests(): with mock.patch.dict( os.environ, { - "DAG_LOCAL_RUN_JOB_PATH": jobs_path_from_caller_directory(""), + "DAGS_LOCAL_RUN_JOB_PATH": jobs_path_from_caller_directory(""), "DAGS_DAG_EXECUTION_CHECK_TIME_PERIOD_SECONDS": "0", "DAGS_TIME_BETWEEN_STATUS_CHECK_SECONDS": "0", },