Skip to content

Commit

Permalink
vdk-meta-jobs: exec job with arguments (#1839)
Browse files Browse the repository at this point in the history
What:
Introduce execution of jobs with arguments (synchronously) via DAGs.

The way a job dict looks now:
{
        "job_name": "name-of-job",
        "team_name": "team-of-job",
        "fail_meta_job_on_error": True or False,
        **"arguments": \<json\>**,
        "depends_on": [name-of-job1, name-of-job2]
}

GitHub Issue:
[https://github.com/vmware/versatile-data-kit/issues/1502](https://github.com/vmware/versatile-data-kit/issues/1502)

Testing Done: existing tests + new unit test

Closes #1502

Signed-off-by: Yoan Salambashev <[email protected]>
  • Loading branch information
yonitoo authored and mivanov1988 committed Apr 11, 2023
1 parent a29c7ba commit cec4857
Show file tree
Hide file tree
Showing 15 changed files with 237 additions and 54 deletions.
10 changes: 9 additions & 1 deletion projects/vdk-plugins/vdk-meta-jobs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def run(job_input):
"job_name": "name-of-job",
"team_name": "team-of-job",
"fail_meta_job_on_error": True or False,
"arguments": {"key1": value1, "key2": value2},
"depends_on": [name-of-job1, name-of-job2]
},
...
Expand All @@ -38,7 +39,8 @@ def run(job_input):
When defining a job to be run following attributes are supported:
* **job_name**: required, the name of the data job
* **team_name:**: optional, the team of the data job. If omitted , it will use the meta job's team
* **fail_meta_job_on_error**: optional, default is true. if true, the meta job will abort and fail if the orchestrated job fails, if false, meta job won't fail and continue.
* **fail_meta_job_on_error**: optional, default is true. If true, the meta job will abort and fail if the orchestrated job fails, if false, meta job won't fail and continue.
* **arguments**: optional, the arguments that are passed to the underlying orchestrated data job.
* **depends_on**: required (can be empty), list of other jobs that the orchestrated job depends on. The job will not be started until depends_on job have finished.


Expand All @@ -65,38 +67,44 @@ JOBS_RUN_ORDER = [
"job_name": "job1",
"team_name": "team-awesome",
"fail_meta_job_on_error": True,
"arguments": {},
"depends_on": []
},

{
"job_name": "job2",
"team_name": "team-awesome",
"fail_meta_job_on_error": True,
"arguments": {},
"depends_on": ["job1"]
},
{
"job_name": "job3",
"team_name": "team-awesome",
"fail_meta_job_on_error": True,
"arguments": {},
"depends_on": ["job1"]
},
{
"job_name": "job4",
"team_name": "team-awesome",
"fail_meta_job_on_error": True,
"arguments": {},
"depends_on": ["job1"]
},

{
"job_name": "job5",
"team_name": "team-awesome",
"fail_meta_job_on_error": True,
"arguments": {},
"depends_on": ["job3"]
},
{
"job_name": "job6",
"team_name": "team-awesome",
"fail_meta_job_on_error": True,
"arguments": {},
"depends_on": ["job3"]
},
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class SingleJob:
job_name: str
team_name: str = None
fail_meta_job_on_error: bool = True
arguments: dict = None
depends_on: List[str] = field(default_factory=list)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
# SPDX-License-Identifier: Apache-2.0
import json
import logging
import os
import time
from typing import Dict
from typing import Optional

import urllib3.exceptions as url_exception
from vdk.api.job_input import IJobArguments
from vdk.internal.core.errors import ErrorMessage
from vdk.internal.core.errors import UserCodeError
from vdk.plugin.meta_jobs.meta import IDataJobExecutor
Expand Down Expand Up @@ -44,7 +44,7 @@ def start_job(self, job_name: str) -> None:
job = self.__get_job(job_name)
job.start_attempt += 1
execution_id = self.start_new_job_execution(
job_name=job.job_name, team_name=job.team_name
job_name=job.job_name, team_name=job.team_name, arguments=job.arguments
)
log.info(f"Starting new data job execution with id {execution_id}")
job.execution_id = execution_id
Expand Down Expand Up @@ -135,7 +135,9 @@ def get_all_jobs(self):
def get_currently_running_jobs(self):
return [j for j in self._jobs_cache.values() if j.status in ACTIVE_JOB_STATUSES]

def start_new_job_execution(self, job_name: str, team_name: str) -> str:
def start_new_job_execution(
self, job_name: str, team_name: str, arguments: IJobArguments = None
) -> str:
"""
Start a new data job execution.
The stages of the process are:
Expand All @@ -152,6 +154,7 @@ def start_new_job_execution(self, job_name: str, team_name: str) -> str:
:param job_name: name of the data job to be executed
:param team_name: name of the owning team
:param arguments: arguments of the data job
:return: id of the started job execution
"""
current_retries = 0
Expand All @@ -163,7 +166,7 @@ def start_new_job_execution(self, job_name: str, team_name: str) -> str:

while current_retries < ALLOWED_RETRIES:
try:
execution_id = self._executor.start_job(job_name, team_name)
execution_id = self._executor.start_job(job_name, team_name, arguments)
return execution_id
except url_exception.TimeoutError as e:
log.info(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright 2023-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import graphlib
import json
import logging
from collections import namedtuple
from typing import Dict
Expand All @@ -14,7 +15,13 @@
ERROR = Error(
TYPE="type", PERMISSION="permission", REQUIREMENT="requirement", CONFLICT="conflict"
)
allowed_job_keys = {"job_name", "team_name", "fail_meta_job_on_error", "depends_on"}
allowed_job_keys = {
"job_name",
"team_name",
"fail_meta_job_on_error",
"depends_on",
"arguments",
}
required_job_keys = {"job_name", "depends_on"}


Expand All @@ -37,7 +44,7 @@ def validate(self, jobs: List[Dict]):
log.info("Successfully validated the DAG!")

def _raise_error(
self, jobs: List[Dict], error_type: str, reason: str, countermeasures: str
self, error_type: str, reason: str, countermeasures: str, jobs: List[str] = ""
):
raise UserCodeError(
ErrorMessage(
Expand All @@ -53,35 +60,50 @@ def _validate_no_duplicates(self, jobs: List[Dict]):
duplicated_jobs = list({job["job_name"] for job in jobs if jobs.count(job) > 1})
if duplicated_jobs:
self._raise_error(
duplicated_jobs,
ERROR.CONFLICT,
f"There are some duplicated jobs: {duplicated_jobs}.",
f"Remove the duplicated jobs from the list - each job can appear in the jobs list at most once. "
f"Duplicated jobs: {duplicated_jobs}.",
duplicated_jobs,
)

def _validate_job(self, job: Dict):
self._validate_job_type(job)
self._validate_allowed_and_required_keys(job)
self._validate_job_name(job)
self._validate_dependencies(job)
self._validate_team_name(job)
self._validate_fail_meta_job_on_error(job)
log.info(f"Successfully validated job: {job['job_name']}")
job_name = job.get("job_name")
self._validate_dependencies(job_name, job["depends_on"])
if "team_name" in job:
self._validate_team_name(job_name, job["team_name"])
if "fail_meta_job_on_error" in job:
self._validate_fail_meta_job_on_error(
job_name, job["fail_meta_job_on_error"]
)
if "arguments" in job:
self._validate_arguments(job_name, job["arguments"])
log.info(f"Successfully validated job: {job_name}")

def _validate_job_type(self, job: Dict):
if not isinstance(job, dict):
self._raise_error(
ERROR.TYPE,
"The job type is not dict.",
f"Change the Data Job type. Current type is {type(job)}. Expected type is dict.",
["".join(list(job))],
)

def _validate_allowed_and_required_keys(self, job: Dict):
forbidden_keys = [key for key in job.keys() if key not in allowed_job_keys]
if forbidden_keys:
disallowed_keys = [key for key in job.keys() if key not in allowed_job_keys]
if disallowed_keys:
self._raise_error(
list(job),
ERROR.PERMISSION,
"One or more job dict keys are not allowed.",
f"Remove the forbidden Data Job Dict keys. "
f"Keys {forbidden_keys} are forbidden. Allowed keys: {allowed_job_keys}.",
f"Remove the disallowed Data Job Dict keys. "
f"Keys {disallowed_keys} are not allowed. Allowed keys: {allowed_job_keys}.",
)
missing_keys = [key for key in required_job_keys if key not in job]
if missing_keys:
self._raise_error(
list(job),
ERROR.REQUIREMENT,
"One or more job dict required keys are missing.",
f"Add the missing required Data Job Dict keys. Keys {missing_keys} "
Expand All @@ -91,54 +113,74 @@ def _validate_allowed_and_required_keys(self, job: Dict):
def _validate_job_name(self, job: Dict):
if not isinstance(job["job_name"], str):
self._raise_error(
list(job),
ERROR.TYPE,
"The type of the job dict key job_name is not string.",
f"Change the Data Job Dict value of job_name. "
f"Current type is {type(job['job_name'])}. Expected type is string.",
["".join(list(job))],
)

def _validate_dependencies(self, job: Dict):
if not (isinstance(job["depends_on"], List)):
def _validate_dependencies(self, job_name: str, dependencies: List[str]):
if not (isinstance(dependencies, List)):
self._raise_error(
list(job),
ERROR.TYPE,
"The type of the job dict depends_on key is not list.",
f"Check the Data Job Dict type of the depends_on key. Current type "
f"is {type(job['depends_on'])}. Expected type is list.",
f"is {type(dependencies)}. Expected type is list.",
[job_name],
)
non_string_dependencies = [
pred for pred in job["depends_on"] if not isinstance(pred, str)
pred for pred in dependencies if not isinstance(pred, str)
]
if non_string_dependencies:
self._raise_error(
list(job),
ERROR.TYPE,
"One or more items of the job dependencies list are not strings.",
f"Check the Data Job Dict values of the depends_on list. "
f"There are some non-string values: {non_string_dependencies}. Expected type is string.",
[job_name],
)

def _validate_team_name(self, job: Dict):
if "team_name" in job and not isinstance(job["team_name"], str):
def _validate_team_name(self, job_name: str, team_name: str):
if not isinstance(team_name, str):
self._raise_error(
list(job),
ERROR.TYPE,
"The type of the job dict key job_name is not string.",
f"Change the Data Job Dict value of team_name. "
f"Current type is {type(job['team_name'])}. Expected type is string.",
f"Current type is {type(team_name)}. Expected type is string.",
[job_name],
)

def _validate_fail_meta_job_on_error(self, job: Dict):
if "fail_meta_job_on_error" in job and not isinstance(
(job["fail_meta_job_on_error"]), bool
):
def _validate_fail_meta_job_on_error(
self, job_name: str, fail_meta_job_on_error: bool
):
if not isinstance(fail_meta_job_on_error, bool):
self._raise_error(
list(job),
ERROR.TYPE,
"The type of the job dict key fail_meta_job_on_error is not bool (True/False).",
f"Change the Data Job Dict value of fail_meta_job_on_error. Current type"
f" is {type(job['fail_meta_job_on_error'])}. Expected type is bool.",
f" is {type(fail_meta_job_on_error)}. Expected type is bool.",
[job_name],
)

def _validate_arguments(self, job_name: str, job_args: dict):
if not isinstance(job_args, dict):
self._raise_error(
ERROR.TYPE,
"The type of the job dict key arguments is not dict.",
f"Change the Data Job Dict value of arguments. "
f"Current type is {type(job_args)}. Expected type is dict.",
[job_name],
)
try:
json.dumps(job_args)
except TypeError as e:
self._raise_error(
ERROR.TYPE,
str(e),
f"Change the Data Job Dict value of arguments. "
f"Current type is {type(job_args)} but not serializable as JSON.",
[job_name],
)

def _check_dag_cycles(self, jobs: List[Dict]):
Expand All @@ -151,8 +193,8 @@ def _check_dag_cycles(self, jobs: List[Dict]):
topological_sorter.prepare()
except graphlib.CycleError as e:
self._raise_error(
e.args[1][:-1],
ERROR.CONFLICT,
"There is a cycle in the DAG.",
f"Change the depends_on list of the jobs that participate in the detected cycle: {e.args[1]}.",
e.args[1][:-1],
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
from typing import Optional

from taurus_datajob_api import DataJobExecution
from vdk.api.job_input import IJobArguments
from vdk.plugin.meta_jobs.api import meta_job


class IDataJobExecutor(abc.ABC):
@abc.abstractmethod
def start_job(self, job_name: str, team_name: str):
def start_job(self, job_name: str, team_name: str, arguments: IJobArguments = None):
"""
Start an execution of a data job and returns its execution id
:param arguments:
:param job_name:
:param team_name:
:return: execution id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@


class MetaJobsDag:
def __init__(self, team_name: str, meta_config: MetaPluginConfiguration):
def __init__(
self,
team_name: str,
meta_config: MetaPluginConfiguration,
):
self._team_name = team_name
self._topological_sorter = TopologicalSorter()
self._delayed_starting_jobs = TimeBasedQueue(
Expand All @@ -49,6 +53,7 @@ def build_dag(self, jobs: List[Dict]):
job["job_name"],
job.get("team_name", self._team_name),
job.get("fail_meta_job_on_error", True),
job.get("arguments", None),
)
self._job_executor.register_job(trackable_job)
self._topological_sorter.add(trackable_job.job_name, *job["depends_on"])
Expand Down
Loading

0 comments on commit cec4857

Please sign in to comment.