Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-dag: improve error handling and error messages #3152

Merged
merged 3 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from vdk.internal.core.errors import UserCodeError
from vdk.plugin.dag.dags import IDataJobExecutor
from vdk.plugin.dag.dags import TrackableJob
from vdk.plugin.dag.exception import DagJobExecutionException
from vdk.plugin.dag.remote_data_job import JobStatus

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -91,22 +92,12 @@ def finalize_job(self, job_name):
job.details = details
log.info(
f"Finished data job {job_name}:\n"
f"start_time: {details['start_time']}\n"
f"end_time: {details.get('end_time')}\n"
f"status: {details['status']}\n"
f"message: {details['message']}"
f" start_time: {details['start_time']}\n"
f" end_time: {details.get('end_time')}\n"
f" status: {details['status']}\n"
)
if job.status != JobStatus.SUCCEEDED.value and job.fail_dag_on_error:
raise UserCodeError(
ErrorMessage(
"",
"DAG failed due to a Data Job failure.",
f"Data Job {job_name} failed. See details: {details}",
"The rest of the jobs in the DAG will not be started "
"and the DAG will fail.",
"Investigate the error in the job or re-try again.",
)
)
raise DagJobExecutionException(job_name, details)

@staticmethod
def __get_printable_details(details):
Expand Down
72 changes: 35 additions & 37 deletions projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
from typing import Dict
from typing import List

from vdk.internal.core.errors import ErrorMessage
from vdk.internal.core.errors import UserCodeError
from vdk.plugin.dag.exception import DagValidationException

log = logging.getLogger(__name__)
Error = namedtuple("Error", ["TYPE", "PERMISSION", "REQUIREMENT", "CONFLICT"])
Expand Down Expand Up @@ -45,23 +44,10 @@ def validate(self, jobs: List[Dict]):
self._check_dag_cycles(jobs)
log.debug("Successfully validated the DAG!")

def _raise_error(
self, error_type: str, reason: str, countermeasures: str, jobs: List[str] = ""
):
raise UserCodeError(
ErrorMessage(
"",
"DAG failed due to a Data Job validation failure.",
f"There is a {error_type} error with job(s) {jobs}. " + reason,
"The DAG will not be built and will fail.",
countermeasures,
)
)

def _validate_no_duplicates(self, jobs: List[Dict]):
duplicated_jobs = list({job["job_name"] for job in jobs if jobs.count(job) > 1})
if duplicated_jobs:
self._raise_error(
raise DagValidationException(
ERROR.CONFLICT,
f"There are some duplicated jobs: {duplicated_jobs}.",
f"Remove the duplicated jobs from the list - each job can appear in the jobs list at most once. "
Expand All @@ -85,100 +71,111 @@ def _validate_job(self, job: Dict):

def _validate_job_type(self, job: Dict):
if not isinstance(job, dict):
self._raise_error(
jobs = ["".join(list(job))]
raise DagValidationException(
ERROR.TYPE,
"The job type is not dict.",
f"Change the Data Job type. Current type is {type(job)}. Expected type is dict.",
["".join(list(job))],
jobs,
)

def _validate_allowed_and_required_keys(self, job: Dict):
disallowed_keys = [key for key in job.keys() if key not in allowed_job_keys]
if disallowed_keys:
self._raise_error(
raise DagValidationException(
ERROR.PERMISSION,
"One or more job dict keys are not allowed.",
f"Remove the disallowed Data Job Dict keys. "
f"Keys {disallowed_keys} are not allowed. Allowed keys: {allowed_job_keys}.",
None,
)
missing_keys = [key for key in required_job_keys if key not in job]
if missing_keys:
self._raise_error(
raise DagValidationException(
ERROR.REQUIREMENT,
"One or more job dict required keys are missing.",
f"Add the missing required Data Job Dict keys. Keys {missing_keys} "
f"are missing. Required keys: {required_job_keys}.",
None,
)

def _validate_job_name(self, job: Dict):
if not isinstance(job["job_name"], str):
self._raise_error(
jobs = ["".join(list(job))]
raise DagValidationException(
ERROR.TYPE,
"The type of the job dict key job_name is not string.",
f"Change the Data Job Dict value of job_name. "
f"Current type is {type(job['job_name'])}. Expected type is string.",
["".join(list(job))],
jobs,
)

def _validate_dependencies(self, job_name: str, dependencies: List[str]):
if not (isinstance(dependencies, List)):
self._raise_error(
jobs = [job_name]
raise DagValidationException(
ERROR.TYPE,
"The type of the job dict depends_on key is not list.",
f"Check the Data Job Dict type of the depends_on key. Current type "
f"is {type(dependencies)}. Expected type is list.",
[job_name],
jobs,
)
non_string_dependencies = [
pred for pred in dependencies if not isinstance(pred, str)
]
if non_string_dependencies:
self._raise_error(
jobs1 = [job_name]
raise DagValidationException(
ERROR.TYPE,
"One or more items of the job dependencies list are not strings.",
f"Check the Data Job Dict values of the depends_on list. "
f"There are some non-string values: {non_string_dependencies}. Expected type is string.",
[job_name],
jobs1,
)

def _validate_team_name(self, job_name: str, team_name: str):
if not isinstance(team_name, str):
self._raise_error(
jobs = [job_name]
raise DagValidationException(
ERROR.TYPE,
"The type of the job dict key job_name is not string.",
f"Change the Data Job Dict value of team_name. "
f"Current type is {type(team_name)}. Expected type is string.",
[job_name],
jobs,
)

def _validate_fail_dag_on_error(self, job_name: str, fail_dag_on_error: bool):
if not isinstance(fail_dag_on_error, bool):
self._raise_error(
jobs = [job_name]
raise DagValidationException(
ERROR.TYPE,
"The type of the job dict key fail_dag_on_error is not bool (True/False).",
f"Change the Data Job Dict value of fail_dag_on_error. Current type"
f" is {type(fail_dag_on_error)}. Expected type is bool.",
[job_name],
jobs,
)

def _validate_arguments(self, job_name: str, job_args: dict):
if not isinstance(job_args, dict):
self._raise_error(
jobs = [job_name]
raise DagValidationException(
ERROR.TYPE,
"The type of the job dict key arguments is not dict.",
f"Change the Data Job Dict value of arguments. "
f"Current type is {type(job_args)}. Expected type is dict.",
[job_name],
jobs,
)
try:
json.dumps(job_args)
except TypeError as e:
self._raise_error(
reason = str(e)
jobs1 = [job_name]
raise DagValidationException(
ERROR.TYPE,
str(e),
reason,
f"Change the Data Job Dict value of arguments. "
f"Current type is {type(job_args)} but not serializable as JSON.",
[job_name],
jobs1,
)

def _check_dag_cycles(self, jobs: List[Dict]):
Expand All @@ -190,9 +187,10 @@ def _check_dag_cycles(self, jobs: List[Dict]):
# Preparing the sorter raises CycleError if cycles exist
topological_sorter.prepare()
except graphlib.CycleError as e:
self._raise_error(
jobs1 = e.args[1][:-1]
raise DagValidationException(
ERROR.CONFLICT,
"There is a cycle in the DAG.",
f"Change the depends_on list of the jobs that participate in the detected cycle: {e.args[1]}.",
e.args[1][:-1],
jobs1,
)
85 changes: 85 additions & 0 deletions projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright 2023-2024 Broadcom
# SPDX-License-Identifier: Apache-2.0
from typing import List
from typing import Optional

from vdk.internal.core.error_classifiers import ResolvableBy
from vdk.internal.core.error_classifiers import ResolvableByActual
from vdk.internal.core.errors import BaseVdkError


class DagValidationException(BaseVdkError):
"""
Exception raised for errors during DAG data job validation.

:param error_type: The type of error encountered.
:param reason: Explanation of why the error occurred.
:param countermeasures: Suggested actions to resolve the error.
:param jobs: List of jobs associated with the error, defaults to None.
"""

def __init__(
self, error_type: str, reason: str, countermeasures: str, jobs: List[str] = None
):
self.jobs = jobs if jobs is not None else []
self.error_type = error_type
self.reason = reason
self.countermeasures = countermeasures

jobs_formatted = ", ".join(self.jobs) if self.jobs else "N/A"
message = (
f"DAG Validation Error:\n"
f" - Error Type: {self.error_type}\n"
f" - Affected Jobs: {jobs_formatted}\n"
f" - Reason: {self.reason}\n"
f" - Countermeasures: {self.countermeasures}"
)
super().__init__(ResolvableByActual.USER, ResolvableBy.USER_ERROR, message)


class DagJobExecutionException(BaseVdkError):
"""
Exception raised when an execution of a job within a DAG fails.

:param str job_name: The name of the job that failed.
:param dict details: Any details relevant to the failure, optional.
"""

def __init__(self, job_name: str, details: Optional[dict] = None):
self.job_name = job_name
self.details = details if details is not None else {}

details_formatted = self.format_details(self.details)

message = (
f"Failure in DAG execution - Job '{self.job_name}' failed.\n"
f" - Failed Job details:\n{details_formatted}"
)
# regardless of the failed job resolvable type, the DAG job always fails with user error
# since the DAG itself didn't fail due to platform error.
# The failed job itself might be platform error in this case the platform would still be alerted.
# While the user is responsible for looking at the DAG itself.
super().__init__(ResolvableByActual.USER, ResolvableBy.USER_ERROR, message)

@staticmethod
def format_details(details: dict) -> str:
if not details:
return "None"

def format_dict(d, indent=0, indent_prefix=" ", initial_prefix=" "):
formatted_str = ""
current_indent = initial_prefix + indent_prefix * indent
for key, value in d.items():
if not value:
continue
formatted_str += f"{current_indent}{key}: "
if isinstance(value, dict) and indent < 1:
# Print nested dictionaries only up to the 2nd level
formatted_str += "\n" + format_dict(
value, indent + 1, indent_prefix
)
else:
formatted_str += f"{value}\n"
return formatted_str

return format_dict(details)
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ def _determine_status_without_summary(self, result: int) -> str:
return JobStatus.SUCCEEDED.value

def _update_message_with_summary(self, content: str):
self._message = {"summary": json.loads(content), "logs": self._log_file}
self._message = json.loads(content)
self._message["logs"] = self._log_file


class LocalDataJobRunException(Exception):
Expand All @@ -158,9 +159,10 @@ def __find_job_path(job_name: str):
candidates = [
os.getcwd(),
]
# TODO: expose this using the plugin configuration mechanisms (which infers also from env. vars among others)
candidates += [
part
for part in os.environ.get("DAG_LOCAL_RUN_JOB_PATH", "").split(":")
for part in os.environ.get("DAGS_LOCAL_RUN_JOB_PATH", "").split(":")
if part
]

Expand Down
3 changes: 2 additions & 1 deletion projects/vdk-plugins/vdk-dag/tests/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from vdk.internal.core.errors import UserCodeError
from vdk.plugin.dag import dag_plugin
from vdk.plugin.dag import dag_runner
from vdk.plugin.dag.exception import DagValidationException
from vdk.plugin.test_utils.util_funcs import cli_assert_equal
from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner
from vdk.plugin.test_utils.util_funcs import jobs_path_from_caller_directory
Expand Down Expand Up @@ -436,7 +437,7 @@ def _test_dag_validation(self, dag_name):
self.runner = CliEntryBasedTestRunner(dag_plugin)
result = self._run_dag(dag_name)
cli_assert_equal(1, result)
self._assert_dag_fails_with_error(result, UserCodeError)
self._assert_dag_fails_with_error(result, DagValidationException)
self.httpserver.stop()

def test_dag_circular_dependency(self):
Expand Down
2 changes: 1 addition & 1 deletion projects/vdk-plugins/vdk-dag/tests/test_local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def reduce_retries_in_test_http_requests():
with mock.patch.dict(
os.environ,
{
"DAG_LOCAL_RUN_JOB_PATH": jobs_path_from_caller_directory(""),
"DAGS_LOCAL_RUN_JOB_PATH": jobs_path_from_caller_directory(""),
"DAGS_DAG_EXECUTION_CHECK_TIME_PERIOD_SECONDS": "0",
"DAGS_TIME_BETWEEN_STATUS_CHECK_SECONDS": "0",
},
Expand Down
Loading