Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-meta-jobs: improve DAGs code documentation #1873

Merged
merged 17 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
053d90a
vdk-meta-jobs: improve root dir code docs
yonitoo Apr 10, 2023
0db2b23
vdk-meta-jobs: fix spelling error
yonitoo Apr 10, 2023
837eee0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 10, 2023
991079b
vdk-meta-jobs: use seealso directives for config vars
yonitoo Apr 12, 2023
1746764
vdk-meta-jobs: add queue type and rename plugin_entry to dags_plugin
yonitoo Apr 12, 2023
2afc964
vdk-meta-jobs: switch to dags_plugin in setup.py
yonitoo Apr 12, 2023
38305da
Merge branch 'main' into person/ysalambashev/update-dags-code-docs
yonitoo Apr 12, 2023
36c65e5
vdk-meta-jobs: rename plugin_entry in new post-merge unit tests
yonitoo Apr 12, 2023
88be1b9
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 12, 2023
8fe1882
Merge branch 'main' into person/ysalambashev/update-dags-code-docs
yonitoo Apr 12, 2023
8e909a2
Merge branch 'main' into person/ysalambashev/update-dags-code-docs
yonitoo Apr 12, 2023
0fa2d2d
Merge branch 'main' into person/ysalambashev/update-dags-code-docs
yonitoo Apr 12, 2023
49870bd
vdk-meta-jobs: remove some obvious docs
yonitoo Apr 13, 2023
75f3d4b
Merge branch 'main' into person/ysalambashev/update-dags-code-docs
yonitoo Apr 13, 2023
c8c49a6
Merge branch 'main' into person/ysalambashev/update-dags-code-docs
yonitoo Apr 13, 2023
6a07f0f
vdk-meta-jobs: change some docs and remove redundant ones
yonitoo Apr 13, 2023
0401d07
Merge branch 'main' into person/ysalambashev/update-dags-code-docs
yonitoo Apr 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion projects/vdk-plugins/vdk-meta-jobs/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
packages=setuptools.find_namespace_packages(where="src"),
# This is the only vdk plugin specifc part
# Define entry point called "vdk.plugin.run" with name of plugin and module to act as entry point.
entry_points={"vdk.plugin.run": ["meta-jobs = vdk.plugin.meta_jobs.plugin_entry"]},
entry_points={"vdk.plugin.run": ["meta-jobs = vdk.plugin.meta_jobs.dags_plugin"]},
classifiers=[
"Development Status :: 2 - Pre-Alpha",
"License :: OSI Approved :: Apache Software License",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@
@dataclass
class SingleJob:
"""
TODO
This class represents a single job to be executed.

:param job_name: the name of the job.
:param team_name: the name of the team that owns the job.
:param fail_dag_on_error: boolean flag indicating whether the job should be executed.
:param arguments: JSON-serializable dictionary of arguments to be passed to the job.
:param depends_on: list of names of jobs that this job depends on.
"""

job_name: str
Expand All @@ -23,18 +29,25 @@ class SingleJob:
@dataclass
class MetaJob(SingleJob):
"""
TODO
This class represents a DAG Job, which is a single job itself and consists of single jobs - the orchestrated ones.

:param jobs: list of the orchestrated jobs
"""

jobs: List[SingleJob] = field(default_factory=list)


class IMetaJobInput(abc.ABC):
"""
This class is responsible for the DAG job run.
"""

@abstractmethod
def run_meta_job(self, meta_job: MetaJob):
"""
TODO
:param meta_job:
Runs the given DAG job.

:param dag: the DAG job to be run
:return:
"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,41 @@


class TrackingDataJobExecutor:
"""
The purpose of this class is to execute Data Jobs, track them and change their
statuses accordingly.
"""

def __init__(
self, executor: IDataJobExecutor, time_between_status_check_seconds: int
):
"""

:param executor: the Data Job executor
:param time_between_status_check_seconds: the number of seconds between status check
"""
self._executor = executor
self._jobs_cache: Dict[str, TrackableJob] = dict()
self._time_between_status_check_seconds = time_between_status_check_seconds

def register_job(self, job: TrackableJob):
"""
Registers a Data Job by adding it to the cache.

:param job: the job to be added to the cache
:return:
"""
if job.job_name in self._jobs_cache:
log.warning(
f"Job with name {job.job_name} aleady exists. Details: {self._jobs_cache[job.job_name]}. "
f"Job with name {job.job_name} already exists. Details: {self._jobs_cache[job.job_name]}. "
f"Will overwrite it."
)
self._jobs_cache[job.job_name] = job

def start_job(self, job_name: str) -> None:
"""
Starts a Data Job.

:param job_name: the job to start and track
"""
job = self.__get_job(job_name)
Expand All @@ -57,6 +75,12 @@ def start_job(self, job_name: str) -> None:
)

def finalize_job(self, job_name):
"""
Finalizes a finished job by updating its details and logging them or raising an error.

:param job_name: the name of the job
:return:
"""
job = self.__get_job(job_name)
details = self._executor.details_job(
job.job_name, job.team_name, job.execution_id
Expand Down Expand Up @@ -100,6 +124,12 @@ def __is_job_submitted(job: TrackableJob):
return job.status is not None

def status(self, job_name: str) -> str:
"""
Gets the status of a job.

:param job_name: the name of the job
:return: the job status
"""
job = self.__get_job(job_name)
if job.status in ACTIVE_JOB_STATUSES:
job.status = self._executor.status_job(
Expand All @@ -109,6 +139,9 @@ def status(self, job_name: str) -> str:
return job.status

def get_finished_job_names(self):
"""
:return: list of the names of all the finalized jobs
"""
finalized_jobs = []
# TODO: optimize
# Do not call the status every time (use TTL caching)
Expand All @@ -130,9 +163,15 @@ def get_finished_job_names(self):
return finalized_jobs

def get_all_jobs(self):
"""
:return: list of all jobs
"""
return list(self._jobs_cache.values())

def get_currently_running_jobs(self):
"""
:return: list of jobs with current status SUBMITTED or RUNNING
"""
return [j for j in self._jobs_cache.values() if j.status in ACTIVE_JOB_STATUSES]

def start_new_job_execution(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class DagValidator:
def validate(self, jobs: List[Dict]):
"""
Validate the structure and the order of the DAG of Data Jobs.

:param jobs: List of Data Jobs (DAG vertices) to be validated
:return:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def run_job(context: JobContext) -> None:
@staticmethod
@hookimpl
def vdk_configure(config_builder: ConfigurationBuilder) -> None:
"""
Here we define what configuration settings are needed for DAGs with reasonable defaults.
"""
add_definitions(config_builder)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@


class IDataJobExecutor(abc.ABC):
"""
This module is responsible for the execution of Data Jobs.
"""

@abc.abstractmethod
def start_job(self, job_name: str, team_name: str, arguments: IJobArguments = None):
"""
Expand Down Expand Up @@ -60,6 +64,10 @@ def job_executions_list(

@dataclass
class TrackableJob(meta_job.SingleJob):
"""
This class provides the ability to track status details of Data Job during execution.
"""

status: str = None
execution_id: str = None
details: dict = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,67 @@ def __init__(self, config: Configuration):
self.__config = config

def meta_jobs_delayed_jobs_min_delay_seconds(self):
"""
Returns the minimum delay time for a delayed job to be executed in seconds.

:return: the number of seconds for the minimum delay of a delayed job

:seealso: `META_JOBS_DELAYED_JOBS_MIN_DELAY_SECONDS <https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_configuration.py#L90>`
"""
return self.__config.get_value(META_JOBS_DELAYED_JOBS_MIN_DELAY_SECONDS)

def meta_jobs_delayed_jobs_randomized_added_delay_seconds(self):
"""
Returns the additional randomized delay time in seconds to the minimum delay time of a delayed job.

:return: the number of seconds for the additional randomized delay of the delayed jobs

:seealso: `META_JOBS_DELAYED_JOBS_RANDOMIZED_ADDED_DELAY_SECONDS <https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_configuration.py#L100>`
"""
return self.__config.get_value(
META_JOBS_DELAYED_JOBS_RANDOMIZED_ADDED_DELAY_SECONDS
)

def meta_jobs_dag_execution_check_time_period_seconds(self):
"""
Returns the frequency at which the system checks a DAG execution's status.

:return: the frequency in seconds at which the system checks a DAG execution's status

:seealso: `META_JOBS_DAG_EXECUTION_CHECK_TIME_PERIOD_SECONDS <https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_configuration.py#L111>`
"""
return self.__config.get_value(
META_JOBS_DAG_EXECUTION_CHECK_TIME_PERIOD_SECONDS
)

def meta_jobs_time_between_status_check_seconds(self):
"""
Returns the time interval in seconds between status checks for a job.

:return: the number of seconds between status checks for a job.

:seealso: `META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS <https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_configuration.py#L121>`
"""
return self.__config.get_value(META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS)

def meta_jobs_max_concurrent_running_jobs(self):
"""
Returns the limit of concurrent running jobs.

:return: the number of maximum concurrent running jobs

:seealso: `META_JOBS_MAX_CONCURRENT_RUNNING_JOBS <https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_configuration.py#L132>`
"""
return self.__config.get_value(META_JOBS_MAX_CONCURRENT_RUNNING_JOBS)


def add_definitions(config_builder: ConfigurationBuilder):
"""
Defines what configuration settings are needed for the DAGs plugin with reasonable defaults.

:param config_builder: the builder used to add the configuration variables
:return:
"""
config_builder.add(
key=META_JOBS_DELAYED_JOBS_MIN_DELAY_SECONDS,
default_value=30,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@


class MetaJobsDag:
def __init__(
self,
team_name: str,
meta_config: MetaPluginConfiguration,
):
def __init__(self, team_name: str, meta_config: MetaPluginConfiguration):
"""
This module deals with all the DAG-related operations such as build and execute.

:param team_name: the name of the owning team
:param meta_config: the DAG job configuration
"""
self._team_name = team_name
self._topological_sorter = TopologicalSorter()
self._delayed_starting_jobs = TimeBasedQueue(
Expand All @@ -47,6 +49,12 @@ def __init__(
self._dag_validator = DagValidator()

def build_dag(self, jobs: List[Dict]):
"""
Validate the jobs and build a DAG based on their dependency lists.

:param jobs: the jobs that are part of the DAG
:return:
"""
self._dag_validator.validate(jobs)
for job in jobs:
trackable_job = TrackableJob(
Expand All @@ -59,6 +67,11 @@ def build_dag(self, jobs: List[Dict]):
self._topological_sorter.add(trackable_job.job_name, *job["depends_on"])

def execute_dag(self):
"""
Execute the DAG of jobs.

:return:
"""
self._topological_sorter.prepare()
while self._topological_sorter.is_active():
for node in self._topological_sorter.get_ready():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,17 @@ def get_json(obj):


class MetaJobInput(IMetaJobInput):
"""
This module is responsible for the execution of DAG of Data Jobs.
"""

def run_meta_job(self, jobs: List[Dict]):
"""
Runs the DAG of jobs - initializes it, builds it, executes it and logs the summary.

:param jobs: the list of jobs that are part of the DAG
:return:
"""
dag = MetaJobsDag(TEAM_NAME, META_CONFIG)
dag.build_dag(jobs)
dag.execute_dag()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class JobStatus(str, Enum):

class RemoteDataJob:
"""
Interact with Verstile Data Kit (VDK) Control Service
Interact with Versatile Data Kit (VDK) Control Service
"""

def __init__(
Expand All @@ -54,6 +54,15 @@ def __init__(
timeout: int = 5, # TODO: Set reasonable default
**kwargs,
) -> None:
"""

:param job_name: the name of the job
:param team_name: the name of the owning team
:param rest_api_url: the Control Service REST API URL
:param arguments: the job arguments
:param timeout: timeout
:param kwargs: extra parameters
"""
self.__job_name = job_name
self.__team_name = team_name
self.__rest_api_url = rest_api_url
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@


class RemoteDataJobExecutor(IDataJobExecutor):
"""
This module is responsible for executing remote Data Jobs.
"""

def start_job(self, job_name: str, team_name: str, arguments: IJobArguments = None):
vdk_cfg = VDKConfig()
job = RemoteDataJob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ def __init__(
self._dequeue_timeout_seconds = dequeue_timeout_seconds

def enqueue(self, element):
"""
Adds an element to the queue.

:param element: the element to be added to the queue
:type typing.Any
:return:
"""
ready_time = self._min_ready_time_seconds + randint(
0, self._randomize_delay_seconds
)
Expand All @@ -58,6 +65,11 @@ def dequeue(self):
return None

def size(self):
"""
Returns the size of the queue.

:return:
"""
return len(self._elements)

def __wait_for_entry_to_be_ready(self, ready_time: float):
Expand Down
Loading