Skip to content

Commit

Permalink
vdk-dag: add local executor (#3097)
Browse files Browse the repository at this point in the history
In order to be able to test locally a full dag, we need a local executor
which will run the data jobs locally.

The way it works user sets `DAGS_JOB_EXECUTOR_TYPE=local` and must
ensure all jobs are at the same level as the DAG job or the current
working directory in order to find them
  • Loading branch information
antoniivanov authored Feb 10, 2024
1 parent 1402f3a commit 7c993c9
Show file tree
Hide file tree
Showing 13 changed files with 424 additions and 4 deletions.
17 changes: 17 additions & 0 deletions projects/vdk-plugins/vdk-dag/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,23 @@ There are 3 types of jobs right now in terms of how are they started.
* If a DAG job tries to start a job and there is already running such job, the approach of the DAG job would be
similar to the schedule - retry later but more times.

### Local run of DAG jobs

By default the DAG would execute deployed jobs by VDK Control Service.
Those are jobs deployed (usually using vdk deploy command) in managed environment by VDK Control Service.
We will call this "remote" execution.

If you want to test or debug the DAG you would need to run all jobs locally (on your machine only).
This is possible by setting following configuration either as environment variable or in the DAG Job config.ini file

```python
DAGS_JOB_EXECUTOR_TYPE=local
```

This will run all jobs from local file system. The orchestrated jobs will be searched either:
- In the current working directory (where you executed the `vdk run dag-job` command)
- or in the same level as the DAG Job directory.

### Configuration

The configuration variables of the VDK DAGs can be checked by running the command:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,11 @@ def finalize_job(self, job_name):
@staticmethod
def __get_printable_details(details):
del details["deployment"]
return json.dumps(details, default=lambda o: str(o), indent=2)
return json.dumps(
details,
default=lambda o: o.__dict__ if hasattr(o, "__dict__") else str(o),
indent=2,
)

def __get_job(self, job_name) -> TrackableJob:
job: TrackableJob = self._jobs_cache.get(job_name)
Expand Down Expand Up @@ -196,7 +200,7 @@ def start_new_job_execution(
job_name: str,
team_name: str,
started_by: str = None,
arguments: IJobArguments = None,
arguments: dict = None,
) -> str:
"""
Start a new data job execution.
Expand Down
13 changes: 12 additions & 1 deletion projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,23 @@
from vdk.plugin.dag.dag_plugin_configuration import DagPluginConfiguration
from vdk.plugin.dag.dag_validator import DagValidator
from vdk.plugin.dag.dags import TrackableJob
from vdk.plugin.dag.local_executor import LocalDataJobExecutor
from vdk.plugin.dag.remote_data_job_executor import RemoteDataJobExecutor
from vdk.plugin.dag.time_based_queue import TimeBasedQueue

log = logging.getLogger(__name__)


def get_job_executor(executor_type: str):
if executor_type.lower() == "remote":
return RemoteDataJobExecutor()
if executor_type.lower() == "local":
return LocalDataJobExecutor()
raise ValueError(
f"Unsupported executor type: {executor_type}. Must be either remote or local."
)


class Dag:
def __init__(
self,
Expand Down Expand Up @@ -49,7 +60,7 @@ def __init__(
dags_config.dags_dag_execution_check_time_period_seconds()
)
self._job_executor = TrackingDataJobExecutor(
executor=RemoteDataJobExecutor(),
executor=get_job_executor(dags_config.dags_job_executor_type()),
time_between_status_check_seconds=dags_config.dags_time_between_status_check_seconds(),
)
self._dag_validator = DagValidator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
DAGS_TIME_BETWEEN_STATUS_CHECK_SECONDS = "DAGS_TIME_BETWEEN_STATUS_CHECK_SECONDS"
DAGS_MAX_CONCURRENT_RUNNING_JOBS = "DAGS_MAX_CONCURRENT_RUNNING_JOBS"

DAGS_JOB_EXECUTOR_TYPE = "DAGS_JOB_EXECUTOR_TYPE"


class DagPluginConfiguration:
def __init__(self, config: Configuration):
Expand Down Expand Up @@ -68,6 +70,9 @@ def dags_max_concurrent_running_jobs(self):
"""
return self.__config.get_value(DAGS_MAX_CONCURRENT_RUNNING_JOBS)

def dags_job_executor_type(self):
return self.__config.get_value(DAGS_JOB_EXECUTOR_TYPE)


def add_definitions(config_builder: ConfigurationBuilder):
"""
Expand Down Expand Up @@ -128,3 +133,13 @@ def add_definitions(config_builder: ConfigurationBuilder):
"resources. It's advisable to use the default value for this variable."
),
)
config_builder.add(
key=DAGS_JOB_EXECUTOR_TYPE,
default_value="remote",
description=(
"The job executor to use when running the jobs within the DAG"
"There are two possible values : 'remote' and 'local'."
"'remote' executor would run the jobs deployed in the control service."
"'local' executor will try to find the jobs in the current working directory"
),
)
2 changes: 1 addition & 1 deletion projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def start_job(
job_name: str,
team_name: str,
started_by: str = None,
arguments: IJobArguments = None,
arguments: dict = None,
):
"""
Start an execution of a data job and returns its execution id
Expand Down
215 changes: 215 additions & 0 deletions projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import json
import os
import pathlib
import shlex
import subprocess
import tempfile
from datetime import datetime
from typing import Dict
from typing import List
from typing import Optional

from taurus_datajob_api import DataJobExecution
from vdk.internal.builtin_plugins.run.run_status import ExecutionStatus
from vdk.internal.builtin_plugins.run.summary_output import JobSummaryParser
from vdk.internal.core.error_classifiers import ResolvableBy
from vdk.plugin.dag.dags import IDataJobExecutor
from vdk.plugin.dag.remote_data_job import JobStatus

RUNNING_STATUSES = [JobStatus.RUNNING.value, JobStatus.SUBMITTED.value]


class LocalDataJob:
def __init__(self, job_path: str, job_name: str, team_name: str, arguments: dict):
self._job_name = job_name
self._team_name = team_name
self._job_path = job_path
self._arguments = arguments

temp_dir = os.path.join(tempfile.gettempdir(), "vdk-jobs", job_name)
pathlib.Path(temp_dir).mkdir(parents=True, exist_ok=True)
self._log_file = os.path.join(temp_dir, "run.log")
self._summary_file = os.path.join(temp_dir, "run-summary.json")
self._log_file_handle = None
self._process = None
self._start_time = None
self._end_time = None
self._message = None

def close(self):
if self._log_file_handle:
self._log_file_handle.close()
self._log_file_handle = None
if self._process:
# Ensure the process has finished before closing
self._process.wait()
self._process = None

def __del__(self):
self.close()

@staticmethod
def __prepare_vdk_run_command(path: str, arguments: dict):
path = shlex.quote(str(path))
cmd: list[str] = ["vdk", "run", f"{path}"]
if arguments:
arguments = json.dumps(arguments)
cmd.append("--arguments")
cmd.append(f"{arguments}")
return cmd

def start_run(self):
cmd = self.__prepare_vdk_run_command(self._job_path, self._arguments)

environ_copy = os.environ.copy()
environ_copy["JOB_RUN_SUMMARY_FILE_PATH"] = self._summary_file

self._log_file_handle = pathlib.Path(self._log_file).open(mode="w+")

self._process = subprocess.Popen(
cmd,
stdout=self._log_file_handle,
stderr=self._log_file_handle,
env=environ_copy,
shell=False,
)
self._start_time = datetime.now()
self._message = {"logs": self._log_file}
return str(self._process.pid)

def details(self) -> Dict:
details = dict(
start_time=self._start_time,
end_time=self._end_time,
status=self.status(),
message=self._message,
started_by="",
type="local",
deployment=None,
)
return details

def status(self) -> str:
if not self._process:
return JobStatus.SUBMITTED.value

result = self._process.poll()
if result is None:
return JobStatus.RUNNING.value

if not self._end_time:
self._end_time = datetime.now()

if os.path.exists(self._summary_file):
return self._determine_status_from_summary()
else:
return self._determine_status_without_summary(result)

def _determine_status_from_summary(self) -> str:
content = pathlib.Path(self._summary_file).read_text()
job_summary = JobSummaryParser.from_json(content)

if job_summary.status == ExecutionStatus.SUCCESS:
return JobStatus.SUCCEEDED.value
elif job_summary.status == ExecutionStatus.SKIP_REQUESTED:
return JobStatus.SKIPPED.value
else:
# update with summary only in case of failure
self._update_message_with_summary(content)
if job_summary.blamee in (
ResolvableBy.USER_ERROR,
ResolvableBy.CONFIG_ERROR,
):
return JobStatus.USER_ERROR.value
else:
return JobStatus.PLATFORM_ERROR.value

def _determine_status_without_summary(self, result: int) -> str:
if result != 0:
# update with summary only in case of failure
self._message = {"summary": {"exit_code": result}, "logs": self._log_file}
return JobStatus.PLATFORM_ERROR.value
else:
return JobStatus.SUCCEEDED.value

def _update_message_with_summary(self, content: str):
self._message = {"summary": json.loads(content), "logs": self._log_file}


class LocalDataJobRunException(Exception):
def __init__(self, job_name: str, team_name: str, message: str):
super().__init__(f"Data Job {job_name} of team {team_name}: {message}")
self.job_name = job_name
self.team_name = team_name


class LocalDataJobExecutor(IDataJobExecutor):
"""
This module is responsible for executing local Data Jobs.
"""

def __init__(self):
self._running_jobs: Dict[str, LocalDataJob] = dict()

@staticmethod
def __find_job_path(job_name: str):
candidates = [
os.getcwd(),
]
candidates += [
part
for part in os.environ.get("DAG_LOCAL_RUN_JOB_PATH", "").split(":")
if part
]

for candidate in candidates:
candidate_job_path = os.path.join(candidate, job_name)
if os.path.isdir(candidate_job_path):
return candidate_job_path

raise LocalDataJobRunException(
job_name, "", f"Cannot find the job directory. Search paths: {candidates}"
)

def start_job(
self,
job_name: str,
team_name: str,
started_by: str = None,
arguments: dict = None,
):
if job_name in self._running_jobs:
raise LocalDataJobRunException(
job_name,
team_name,
f"Job run already has been started. Cannot start same job twice.",
)
job_path = self.__find_job_path(job_name)

job = LocalDataJob(
job_path,
job_name,
team_name,
arguments,
)
self._running_jobs[job_name] = job
return job.start_run()

def status_job(self, job_name: str, team_name: str, execution_id: str) -> str:
if job_name in self._running_jobs:
return self._running_jobs.get(job_name).status()
else:
return None # TODO: or raise ?

def details_job(self, job_name: str, team_name: str, execution_id: str) -> dict:
if job_name in self._running_jobs:
return self._running_jobs.get(job_name).details()
else:
return dict() # TODO: or raise ?

def job_executions_list(
self, job_name: str, team_name: str
) -> Optional[List[DataJobExecution]]:
return []
7 changes: 7 additions & 0 deletions projects/vdk-plugins/vdk-dag/tests/jobs/fail-job/1_step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):
raise ArithmeticError("cannot do math :(")
2 changes: 2 additions & 0 deletions projects/vdk-plugins/vdk-dag/tests/jobs/local-dag/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[owner]
team = team-awesome
28 changes: 28 additions & 0 deletions projects/vdk-plugins/vdk-dag/tests/jobs/local-dag/dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
import tempfile

from vdk.api.job_input import IJobInput
from vdk.plugin.dag.dag_runner import DagInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
log.info(f"Dummy arguments {job_input.get_arguments()}")
with tempfile.NamedTemporaryFile(prefix="temp-", mode="w+") as temp_file:
job1 = dict(
job_name="write-file-job",
depends_on=[],
arguments=dict(path=temp_file.name),
)
job2 = dict(
job_name="fail-job", depends_on=["write-file-job"], fail_dag_on_error=False
)
job3 = dict(
job_name="read-file-job",
depends_on=["write-file-job"],
arguments=dict(path=temp_file.name),
)
DagInput().run_dag([job1, job2, job3])
14 changes: 14 additions & 0 deletions projects/vdk-plugins/vdk-dag/tests/jobs/read-file-job/1_step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import pathlib

from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):
path = job_input.get_arguments().get("path")
if path:
if pathlib.Path(path).read_text() != "data":
raise ValueError(f"Expected file {path} content to be 'data'")
else:
raise ValueError("No path provided.")
Loading

0 comments on commit 7c993c9

Please sign in to comment.