Skip to content

Commit

Permalink
Merge main into dependabot/gradle/projects/control-service/projects/c…
Browse files Browse the repository at this point in the history
…om.palantir.git-version-3.0.0
  • Loading branch information
github-actions[bot] authored Mar 30, 2023
2 parents 0784b13 + 9b742e3 commit 954df46
Show file tree
Hide file tree
Showing 15 changed files with 585 additions and 318 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# Copyright 2023-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import graphlib
import logging
from collections import namedtuple
from typing import Dict
from typing import List

from vdk.internal.core.errors import ErrorMessage
from vdk.internal.core.errors import UserCodeError

log = logging.getLogger(__name__)
Error = namedtuple("Error", ["TYPE", "PERMISSION", "REQUIREMENT", "CONFLICT"])
ERROR = Error(
TYPE="type", PERMISSION="permission", REQUIREMENT="requirement", CONFLICT="conflict"
)
allowed_job_keys = {"job_name", "team_name", "fail_meta_job_on_error", "depends_on"}
required_job_keys = {"job_name", "depends_on"}


class DagValidator:
"""
The purpose of this class is to validate the DAG structure and jobs.
It is being used right before the DAG is built.
"""

def validate(self, jobs: List[Dict]):
"""
Validate the structure and the order of the DAG of Data Jobs.
:param jobs: List of Data Jobs (DAG vertices) to be validated
:return:
"""
self._validate_no_duplicates(jobs)
for job in jobs:
self._validate_job(job)
self._check_dag_cycles(jobs)
log.info("Successfully validated the DAG!")

def _raise_error(
self, jobs: List[Dict], error_type: str, reason: str, countermeasures: str
):
raise UserCodeError(
ErrorMessage(
"",
"Meta Job failed due to a Data Job validation failure.",
f"There is a {error_type} error with job(s) {jobs}. " + reason,
"The DAG will not be built and the Meta Job will fail.",
countermeasures,
)
)

def _validate_no_duplicates(self, jobs: List[Dict]):
duplicated_jobs = list({job["job_name"] for job in jobs if jobs.count(job) > 1})
if duplicated_jobs:
self._raise_error(
duplicated_jobs,
ERROR.CONFLICT,
f"There are some duplicated jobs: {duplicated_jobs}.",
f"Remove the duplicated jobs from the list - each job can appear in the jobs list at most once. "
f"Duplicated jobs: {duplicated_jobs}.",
)

def _validate_job(self, job: Dict):
self._validate_allowed_and_required_keys(job)
self._validate_job_name(job)
self._validate_dependencies(job)
self._validate_team_name(job)
self._validate_fail_meta_job_on_error(job)
log.info(f"Successfully validated job: {job['job_name']}")

def _validate_allowed_and_required_keys(self, job: Dict):
forbidden_keys = [key for key in job.keys() if key not in allowed_job_keys]
if forbidden_keys:
self._raise_error(
list(job),
ERROR.PERMISSION,
"One or more job dict keys are not allowed.",
f"Remove the forbidden Data Job Dict keys. "
f"Keys {forbidden_keys} are forbidden. Allowed keys: {allowed_job_keys}.",
)
missing_keys = [key for key in required_job_keys if key not in job]
if missing_keys:
self._raise_error(
list(job),
ERROR.REQUIREMENT,
"One or more job dict required keys are missing.",
f"Add the missing required Data Job Dict keys. Keys {missing_keys} "
f"are missing. Required keys: {required_job_keys}.",
)

def _validate_job_name(self, job: Dict):
if not isinstance(job["job_name"], str):
self._raise_error(
list(job),
ERROR.TYPE,
"The type of the job dict key job_name is not string.",
f"Change the Data Job Dict value of job_name. "
f"Current type is {type(job['job_name'])}. Expected type is string.",
)

def _validate_dependencies(self, job: Dict):
if not (isinstance(job["depends_on"], List)):
self._raise_error(
list(job),
ERROR.TYPE,
"The type of the job dict depends_on key is not list.",
f"Check the Data Job Dict type of the depends_on key. Current type "
f"is {type(job['depends_on'])}. Expected type is list.",
)
non_string_dependencies = [
pred for pred in job["depends_on"] if not isinstance(pred, str)
]
if non_string_dependencies:
self._raise_error(
list(job),
ERROR.TYPE,
"One or more items of the job dependencies list are not strings.",
f"Check the Data Job Dict values of the depends_on list. "
f"There are some non-string values: {non_string_dependencies}. Expected type is string.",
)

def _validate_team_name(self, job: Dict):
if "team_name" in job and not isinstance(job["team_name"], str):
self._raise_error(
list(job),
ERROR.TYPE,
"The type of the job dict key job_name is not string.",
f"Change the Data Job Dict value of team_name. "
f"Current type is {type(job['team_name'])}. Expected type is string.",
)

def _validate_fail_meta_job_on_error(self, job: Dict):
if "fail_meta_job_on_error" in job and not isinstance(
(job["fail_meta_job_on_error"]), bool
):
self._raise_error(
list(job),
ERROR.TYPE,
"The type of the job dict key fail_meta_job_on_error is not bool (True/False).",
f"Change the Data Job Dict value of fail_meta_job_on_error. Current type"
f" is {type(job['fail_meta_job_on_error'])}. Expected type is bool.",
)

def _check_dag_cycles(self, jobs: List[Dict]):
topological_sorter = graphlib.TopologicalSorter()
for job in jobs:
topological_sorter.add(job["job_name"], *job["depends_on"])

try:
# Preparing the sorter raises CycleError if cycles exist
topological_sorter.prepare()
except graphlib.CycleError as e:
self._raise_error(
e.args[1][:-1],
ERROR.CONFLICT,
"There is a cycle in the DAG.",
f"Change the depends_on list of the jobs that participate in the detected cycle: {e.args[1]}.",
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# SPDX-License-Identifier: Apache-2.0
import json
import logging
import os
import pprint
import sys
import time
Expand All @@ -13,6 +12,7 @@

from taurus_datajob_api import ApiException
from vdk.plugin.meta_jobs.cached_data_job_executor import TrackingDataJobExecutor
from vdk.plugin.meta_jobs.dag_validator import DagValidator
from vdk.plugin.meta_jobs.meta import TrackableJob
from vdk.plugin.meta_jobs.meta_configuration import MetaPluginConfiguration
from vdk.plugin.meta_jobs.remote_data_job_executor import RemoteDataJobExecutor
Expand Down Expand Up @@ -40,10 +40,11 @@ def __init__(self, team_name: str, meta_config: MetaPluginConfiguration):
executor=RemoteDataJobExecutor(),
time_between_status_check_seconds=meta_config.meta_jobs_time_between_status_check_seconds(),
)
self._dag_validator = DagValidator()

def build_dag(self, jobs: List[Dict]):
self._dag_validator.validate(jobs)
for job in jobs:
# TODO: add some job validation here; check the job exists, its previous jobs exists, etc
trackable_job = TrackableJob(
job["job_name"],
job.get("team_name", self._team_name),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[owner]
team = team-awesome
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging

from vdk.api.job_input import IJobInput
from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
log.info(f"Dummy arguments {job_input.get_arguments()}")

job1 = dict(job_name="job1", depends_on=["job1"])
job2 = dict(job_name="job2", depends_on=["job1"])
job3 = dict(job_name="job3", depends_on=["job1"])
job4 = dict(job_name="job4", depends_on=["job2", "job3"])
MetaJobInput().run_meta_job([job1, job2, job3, job4])
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[owner]
team = team-awesome
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging

from vdk.api.job_input import IJobInput
from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
log.info(f"Dummy arguments {job_input.get_arguments()}")

job1 = dict(job_name="job1", depends_on=[])
job2 = dict(job_name="job2", depends_on=["job1"])
job3 = dict(job_name="job3", depends_on=["job1"])
job4 = dict(job_name="job4", depends_on=["job2", "job3"])
MetaJobInput().run_meta_job([job1, job2, job2, job3, job4])
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[owner]
team = team-awesome
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging

from vdk.api.job_input import IJobInput
from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
log.info(f"Dummy arguments {job_input.get_arguments()}")

job1 = dict(job_name="job1", depend_on=[])
job2 = dict(job_name="job2", depends_on=["job1"])
job3 = dict(job_name="job3", depends_on=["job1"])
job4 = dict(job_name="job4", depends_on=["job2", "job3"])
MetaJobInput().run_meta_job([job1, job2, job3, job4])
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[owner]
team = team-awesome
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging

from vdk.api.job_input import IJobInput
from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
log.info(f"Dummy arguments {job_input.get_arguments()}")

job1 = dict(job_name="job1", fail_meta_job_on_error=1, depends_on=[])
job2 = dict(job_name="job2", depends_on=["job1"])
job3 = dict(job_name="job3", depends_on=["job1"])
job4 = dict(job_name="job4", depends_on=["job2", "job3"])
MetaJobInput().run_meta_job([job1, job2, job3, job4])
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[owner]
team = team-awesome
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging

from vdk.api.job_input import IJobInput
from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
log.info(f"Dummy arguments {job_input.get_arguments()}")

job1 = "job1"
job2 = dict(job_name="job2", depends_on=["job1"])
job3 = dict(job_name="job3", depends_on=["job1"])
job4 = dict(job_name="job4", depends_on=["job2", "job3"])
MetaJobInput().run_meta_job([job1, job2, job3, job4])
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[owner]
team = team-awesome
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging

from vdk.api.job_input import IJobInput
from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
log.info(f"Dummy arguments {job_input.get_arguments()}")

job1 = dict(job_name="job1", depends_on=["job2"])
job2 = dict(job_name="job2", depends_on=["job1"])
job3 = dict(job_name="job3", depends_on=["job1"])
job4 = dict(job_name="job4", depends_on=["job2", "job3"])
MetaJobInput().run_meta_job([job1, job2, job3, job4])
Loading

0 comments on commit 954df46

Please sign in to comment.