Skip to content

Commit

Permalink
vdk-meta-jobs: improve DAGs code documentation (#1873)
Browse files Browse the repository at this point in the history
What:
Improve the DAGs code documentation by ensuring all fields and public
methods have documentation.
Add code docs where it was missing.

Testing Done: not needed as only code docs were added

Signed-off-by: Yoan Salambashev <[email protected]>
  • Loading branch information
yonitoo authored and mivanov1988 committed Apr 18, 2023
1 parent 3c7dbf6 commit 32af61e
Show file tree
Hide file tree
Showing 13 changed files with 176 additions and 23 deletions.
2 changes: 1 addition & 1 deletion projects/vdk-plugins/vdk-meta-jobs/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
packages=setuptools.find_namespace_packages(where="src"),
# This is the only vdk plugin specifc part
# Define entry point called "vdk.plugin.run" with name of plugin and module to act as entry point.
entry_points={"vdk.plugin.run": ["meta-jobs = vdk.plugin.meta_jobs.plugin_entry"]},
entry_points={"vdk.plugin.run": ["meta-jobs = vdk.plugin.meta_jobs.dags_plugin"]},
classifiers=[
"Development Status :: 2 - Pre-Alpha",
"License :: OSI Approved :: Apache Software License",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@
@dataclass
class SingleJob:
"""
TODO
This class represents a single job to be executed.
:param job_name: the name of the job.
:param team_name: the name of the team that owns the job.
:param fail_dag_on_error: boolean flag indicating whether the job should be executed.
:param arguments: JSON-serializable dictionary of arguments to be passed to the job.
:param depends_on: list of names of jobs that this job depends on.
"""

job_name: str
Expand All @@ -23,18 +29,25 @@ class SingleJob:
@dataclass
class MetaJob(SingleJob):
"""
TODO
This class represents a DAG Job, which is a single job itself and consists of single jobs - the orchestrated ones.
:param jobs: list of the orchestrated jobs
"""

jobs: List[SingleJob] = field(default_factory=list)


class IMetaJobInput(abc.ABC):
"""
This class is responsible for the DAG job run.
"""

@abstractmethod
def run_meta_job(self, meta_job: MetaJob):
"""
TODO
:param meta_job:
Runs the given DAG job.
:param dag: the DAG job to be run
:return:
"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,41 @@


class TrackingDataJobExecutor:
"""
The purpose of this class is to execute Data Jobs, track them and change their
statuses accordingly.
"""

def __init__(
self, executor: IDataJobExecutor, time_between_status_check_seconds: int
):
"""
:param executor: the Data Job executor
:param time_between_status_check_seconds: the number of seconds between status check
"""
self._executor = executor
self._jobs_cache: Dict[str, TrackableJob] = dict()
self._time_between_status_check_seconds = time_between_status_check_seconds

def register_job(self, job: TrackableJob):
"""
Registers a Data Job by adding it to the cache.
:param job: the job to be added to the cache
:return:
"""
if job.job_name in self._jobs_cache:
log.warning(
f"Job with name {job.job_name} aleady exists. Details: {self._jobs_cache[job.job_name]}. "
f"Job with name {job.job_name} already exists. Details: {self._jobs_cache[job.job_name]}. "
f"Will overwrite it."
)
self._jobs_cache[job.job_name] = job

def start_job(self, job_name: str) -> None:
"""
Starts a Data Job.
:param job_name: the job to start and track
"""
job = self.__get_job(job_name)
Expand All @@ -57,6 +75,12 @@ def start_job(self, job_name: str) -> None:
)

def finalize_job(self, job_name):
"""
Finalizes a finished job by updating its details and logging them or raising an error.
:param job_name: the name of the job
:return:
"""
job = self.__get_job(job_name)
details = self._executor.details_job(
job.job_name, job.team_name, job.execution_id
Expand Down Expand Up @@ -100,6 +124,12 @@ def __is_job_submitted(job: TrackableJob):
return job.status is not None

def status(self, job_name: str) -> str:
"""
Gets the status of a job.
:param job_name: the name of the job
:return: the job status
"""
job = self.__get_job(job_name)
if job.status in ACTIVE_JOB_STATUSES:
job.status = self._executor.status_job(
Expand All @@ -109,6 +139,9 @@ def status(self, job_name: str) -> str:
return job.status

def get_finished_job_names(self):
"""
:return: list of the names of all the finalized jobs
"""
finalized_jobs = []
# TODO: optimize
# Do not call the status every time (use TTL caching)
Expand All @@ -130,9 +163,15 @@ def get_finished_job_names(self):
return finalized_jobs

def get_all_jobs(self):
"""
:return: list of all jobs
"""
return list(self._jobs_cache.values())

def get_currently_running_jobs(self):
"""
:return: list of jobs with current status SUBMITTED or RUNNING
"""
return [j for j in self._jobs_cache.values() if j.status in ACTIVE_JOB_STATUSES]

def start_new_job_execution(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class DagValidator:
def validate(self, jobs: List[Dict]):
"""
Validate the structure and the order of the DAG of Data Jobs.
:param jobs: List of Data Jobs (DAG vertices) to be validated
:return:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def run_job(context: JobContext) -> None:
@staticmethod
@hookimpl
def vdk_configure(config_builder: ConfigurationBuilder) -> None:
"""
Here we define what configuration settings are needed for DAGs with reasonable defaults.
"""
add_definitions(config_builder)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@


class IDataJobExecutor(abc.ABC):
"""
This module is responsible for the execution of Data Jobs.
"""

@abc.abstractmethod
def start_job(self, job_name: str, team_name: str, arguments: IJobArguments = None):
"""
Expand Down Expand Up @@ -60,6 +64,10 @@ def job_executions_list(

@dataclass
class TrackableJob(meta_job.SingleJob):
"""
This class provides the ability to track status details of Data Job during execution.
"""

status: str = None
execution_id: str = None
details: dict = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,67 @@ def __init__(self, config: Configuration):
self.__config = config

def meta_jobs_delayed_jobs_min_delay_seconds(self):
"""
Returns the minimum delay time for a delayed job to be executed in seconds.
:return: the number of seconds for the minimum delay of a delayed job
:seealso: `META_JOBS_DELAYED_JOBS_MIN_DELAY_SECONDS <https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_configuration.py#L90>`
"""
return self.__config.get_value(META_JOBS_DELAYED_JOBS_MIN_DELAY_SECONDS)

def meta_jobs_delayed_jobs_randomized_added_delay_seconds(self):
"""
Returns the additional randomized delay time in seconds to the minimum delay time of a delayed job.
:return: the number of seconds for the additional randomized delay of the delayed jobs
:seealso: `META_JOBS_DELAYED_JOBS_RANDOMIZED_ADDED_DELAY_SECONDS <https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_configuration.py#L100>`
"""
return self.__config.get_value(
META_JOBS_DELAYED_JOBS_RANDOMIZED_ADDED_DELAY_SECONDS
)

def meta_jobs_dag_execution_check_time_period_seconds(self):
"""
Returns the frequency at which the system checks a DAG execution's status.
:return: the frequency in seconds at which the system checks a DAG execution's status
:seealso: `META_JOBS_DAG_EXECUTION_CHECK_TIME_PERIOD_SECONDS <https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_configuration.py#L111>`
"""
return self.__config.get_value(
META_JOBS_DAG_EXECUTION_CHECK_TIME_PERIOD_SECONDS
)

def meta_jobs_time_between_status_check_seconds(self):
"""
Returns the time interval in seconds between status checks for a job.
:return: the number of seconds between status checks for a job.
:seealso: `META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS <https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_configuration.py#L121>`
"""
return self.__config.get_value(META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS)

def meta_jobs_max_concurrent_running_jobs(self):
"""
Returns the limit of concurrent running jobs.
:return: the number of maximum concurrent running jobs
:seealso: `META_JOBS_MAX_CONCURRENT_RUNNING_JOBS <https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_configuration.py#L132>`
"""
return self.__config.get_value(META_JOBS_MAX_CONCURRENT_RUNNING_JOBS)


def add_definitions(config_builder: ConfigurationBuilder):
"""
Defines what configuration settings are needed for the DAGs plugin with reasonable defaults.
:param config_builder: the builder used to add the configuration variables
:return:
"""
config_builder.add(
key=META_JOBS_DELAYED_JOBS_MIN_DELAY_SECONDS,
default_value=30,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@


class MetaJobsDag:
def __init__(
self,
team_name: str,
meta_config: MetaPluginConfiguration,
):
def __init__(self, team_name: str, meta_config: MetaPluginConfiguration):
"""
This module deals with all the DAG-related operations such as build and execute.
:param team_name: the name of the owning team
:param meta_config: the DAG job configuration
"""
self._team_name = team_name
self._topological_sorter = TopologicalSorter()
self._delayed_starting_jobs = TimeBasedQueue(
Expand All @@ -47,6 +49,12 @@ def __init__(
self._dag_validator = DagValidator()

def build_dag(self, jobs: List[Dict]):
"""
Validate the jobs and build a DAG based on their dependency lists.
:param jobs: the jobs that are part of the DAG
:return:
"""
self._dag_validator.validate(jobs)
for job in jobs:
trackable_job = TrackableJob(
Expand All @@ -59,6 +67,11 @@ def build_dag(self, jobs: List[Dict]):
self._topological_sorter.add(trackable_job.job_name, *job["depends_on"])

def execute_dag(self):
"""
Execute the DAG of jobs.
:return:
"""
self._topological_sorter.prepare()
while self._topological_sorter.is_active():
for node in self._topological_sorter.get_ready():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,17 @@ def get_json(obj):


class MetaJobInput(IMetaJobInput):
"""
This module is responsible for the execution of DAG of Data Jobs.
"""

def run_meta_job(self, jobs: List[Dict]):
"""
Runs the DAG of jobs - initializes it, builds it, executes it and logs the summary.
:param jobs: the list of jobs that are part of the DAG
:return:
"""
dag = MetaJobsDag(TEAM_NAME, META_CONFIG)
dag.build_dag(jobs)
dag.execute_dag()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class JobStatus(str, Enum):

class RemoteDataJob:
"""
Interact with Verstile Data Kit (VDK) Control Service
Interact with Versatile Data Kit (VDK) Control Service
"""

def __init__(
Expand All @@ -54,6 +54,15 @@ def __init__(
timeout: int = 5, # TODO: Set reasonable default
**kwargs,
) -> None:
"""
:param job_name: the name of the job
:param team_name: the name of the owning team
:param rest_api_url: the Control Service REST API URL
:param arguments: the job arguments
:param timeout: timeout
:param kwargs: extra parameters
"""
self.__job_name = job_name
self.__team_name = team_name
self.__rest_api_url = rest_api_url
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@


class RemoteDataJobExecutor(IDataJobExecutor):
"""
This module is responsible for executing remote Data Jobs.
"""

def start_job(self, job_name: str, team_name: str, arguments: IJobArguments = None):
vdk_cfg = VDKConfig()
job = RemoteDataJob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ def __init__(
self._dequeue_timeout_seconds = dequeue_timeout_seconds

def enqueue(self, element):
"""
Adds an element to the queue.
:param element: the element to be added to the queue
:type typing.Any
:return:
"""
ready_time = self._min_ready_time_seconds + randint(
0, self._randomize_delay_seconds
)
Expand All @@ -58,6 +65,11 @@ def dequeue(self):
return None

def size(self):
"""
Returns the size of the queue.
:return:
"""
return len(self._elements)

def __wait_for_entry_to_be_ready(self, ready_time: float):
Expand Down
Loading

0 comments on commit 32af61e

Please sign in to comment.