Skip to content

Commit

Permalink
vdk-dag: add local executor
Browse files Browse the repository at this point in the history
In order to be able to test locally a full dag, we need a local executor
which will run the data jobs locally.

The way it works user sets `DAGS_JOB_EXECUTOR_TYPE=local` and must
ensure all jobs are at the same level as the DAG job or the current
working directory in order to find them
  • Loading branch information
antoniivanov committed Feb 9, 2024
1 parent e978119 commit 5d3d2ea
Show file tree
Hide file tree
Showing 13 changed files with 415 additions and 4 deletions.
17 changes: 17 additions & 0 deletions projects/vdk-plugins/vdk-dag/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,23 @@ There are 3 types of jobs right now in terms of how are they started.
* If a DAG job tries to start a job and there is already running such job, the approach of the DAG job would be
similar to the schedule - retry later but more times.

### Local run of DAG jobs

By default the DAG would execute deployed jobs by VDK Control Service.
Those are jobs deployed (usually using vdk deploy command) in managed environment by VDK Control Service.
We will call this "remote" execution.

If you want to test or debug the DAG you would need to run all jobs locally (on your machine only).
This is possible by setting following configuration either as environment variable or in the DAG Job config.ini file

```python
DAGS_JOB_EXECUTOR_TYPE=local
```

This will run all jobs from local file system. The orchestrated jobs will be searched either:
- In the current working directory (where you executed the `vdk run dag-job` command)
- or in the same level as the DAG Job directory.

### Configuration

The configuration variables of the VDK DAGs can be checked by running the command:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,11 @@ def finalize_job(self, job_name):
@staticmethod
def __get_printable_details(details):
del details["deployment"]
return json.dumps(details, default=lambda o: str(o), indent=2)
return json.dumps(
details,
default=lambda o: o.__dict__ if hasattr(o, "__dict__") else str(o),
indent=2,
)

def __get_job(self, job_name) -> TrackableJob:
job: TrackableJob = self._jobs_cache.get(job_name)
Expand Down Expand Up @@ -196,7 +200,7 @@ def start_new_job_execution(
job_name: str,
team_name: str,
started_by: str = None,
arguments: IJobArguments = None,
arguments: dict = None,
) -> str:
"""
Start a new data job execution.
Expand Down
13 changes: 12 additions & 1 deletion projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,23 @@
from vdk.plugin.dag.dag_plugin_configuration import DagPluginConfiguration
from vdk.plugin.dag.dag_validator import DagValidator
from vdk.plugin.dag.dags import TrackableJob
from vdk.plugin.dag.local_executor import LocalDataJobExecutor
from vdk.plugin.dag.remote_data_job_executor import RemoteDataJobExecutor
from vdk.plugin.dag.time_based_queue import TimeBasedQueue

log = logging.getLogger(__name__)


def get_job_executor(executor_type: str):
if executor_type.lower() == "remote":
return RemoteDataJobExecutor()
if executor_type.lower() == "local":
return LocalDataJobExecutor()
raise ValueError(
f"Unsupported executor type: {executor_type}. Must be either remote or local."
)


class Dag:
def __init__(
self,
Expand Down Expand Up @@ -49,7 +60,7 @@ def __init__(
dags_config.dags_dag_execution_check_time_period_seconds()
)
self._job_executor = TrackingDataJobExecutor(
executor=RemoteDataJobExecutor(),
executor=get_job_executor(dags_config.dags_job_executor_type()),
time_between_status_check_seconds=dags_config.dags_time_between_status_check_seconds(),
)
self._dag_validator = DagValidator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
DAGS_TIME_BETWEEN_STATUS_CHECK_SECONDS = "DAGS_TIME_BETWEEN_STATUS_CHECK_SECONDS"
DAGS_MAX_CONCURRENT_RUNNING_JOBS = "DAGS_MAX_CONCURRENT_RUNNING_JOBS"

DAGS_JOB_EXECUTOR_TYPE = "DAGS_JOB_EXECUTOR_TYPE"


class DagPluginConfiguration:
def __init__(self, config: Configuration):
Expand Down Expand Up @@ -68,6 +70,9 @@ def dags_max_concurrent_running_jobs(self):
"""
return self.__config.get_value(DAGS_MAX_CONCURRENT_RUNNING_JOBS)

def dags_job_executor_type(self):
return self.__config.get_value(DAGS_JOB_EXECUTOR_TYPE)


def add_definitions(config_builder: ConfigurationBuilder):
"""
Expand Down Expand Up @@ -128,3 +133,13 @@ def add_definitions(config_builder: ConfigurationBuilder):
"resources. It's advisable to use the default value for this variable."
),
)
config_builder.add(
key=DAGS_JOB_EXECUTOR_TYPE,
default_value="remote",
description=(
"The job executor to use when running the jobs within the DAG"
"There are two possible values : 'remote' and 'local'."
"'remote' executor would run the jobs deployed in the control service."
"'local' executor will try to find the jobs in the current working directory"
),
)
2 changes: 1 addition & 1 deletion projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def start_job(
job_name: str,
team_name: str,
started_by: str = None,
arguments: IJobArguments = None,
arguments: dict = None,
):
"""
Start an execution of a data job and returns its execution id
Expand Down
205 changes: 205 additions & 0 deletions projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import json
import os
import pathlib
import pprint

Check notice on line 6 in projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py#L6

'pprint' imported but unused (F401)

Check warning on line 6 in projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py#L6

Unused import pprint
import shlex
import subprocess

Check warning on line 8 in projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py#L8

Consider possible security implications associated with the subprocess module.
import tempfile
from datetime import datetime
from typing import Dict
from typing import List
from typing import Optional

from taurus_datajob_api import DataJobExecution
from vdk.api.job_input import IJobArguments

Check notice on line 16 in projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py#L16

'vdk.api.job_input.IJobArguments' imported but unused (F401)

Check warning on line 16 in projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py#L16

Unused IJobArguments imported from vdk.api.job_input
from vdk.internal.builtin_plugins.run.run_status import ExecutionStatus
from vdk.internal.builtin_plugins.run.summary_output import JobSummaryParser
from vdk.internal.core.error_classifiers import ResolvableBy
from vdk.plugin.dag.dags import IDataJobExecutor
from vdk.plugin.dag.remote_data_job import JobStatus

RUNNING_STATUSES = [JobStatus.RUNNING.value, JobStatus.SUBMITTED.value]


class LocalDataJob:
def __init__(self, job_path: str, job_name: str, team_name: str, arguments: dict):
self._job_name = job_name
self._team_name = team_name
self._job_path = job_path
self._arguments = arguments

temp_dir = os.path.join(tempfile.gettempdir(), "vdk-jobs", job_name)
pathlib.Path(temp_dir).mkdir(parents=True, exist_ok=True)
self._log_file = os.path.join(temp_dir, "run.log")
self._summary_file = os.path.join(temp_dir, "run-summary.json")
self._log_file_handle = None
self._process = None
self._start_time = None
self._end_time = None
self._message = None

def close(self):
if self._log_file_handle:
self._log_file_handle.close()

@staticmethod
def __prepare_vdk_run_command(path: str, arguments: dict):
path = shlex.quote(str(path))
cmd: list[str] = ["vdk", "run", f"{path}"]
if arguments:
arguments = json.dumps(arguments)
cmd.append("--arguments")
cmd.append(f"{arguments}")
return cmd

def start_run(self):
cmd = self.__prepare_vdk_run_command(self._job_path, self._arguments)

environ_copy = os.environ.copy()
environ_copy["JOB_RUN_SUMMARY_FILE_PATH"] = self._summary_file

self._log_file_handle = pathlib.Path(self._log_file).open(mode="w+")

self._process = subprocess.Popen(
cmd,
stdout=self._log_file_handle,
stderr=self._log_file_handle,
env=environ_copy,
shell=False,

Check warning on line 70 in projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py#L70

subprocess call - check for execution of untrusted input.
)
self._start_time = datetime.now()
self._message = {"logs": self._log_file}
return str(self._process.pid)

def details(self) -> Dict:
details = dict(
start_time=self._start_time,
end_time=self._end_time,
status=self.status(),
message=self._message,
started_by="",
type="local",
deployment=None,
)
return details

def status(self) -> str:
if not self._process:
return JobStatus.SUBMITTED.value

result = self._process.poll()
if result is None:
return JobStatus.RUNNING.value

if not self._end_time:
self._end_time = datetime.now()

if os.path.exists(self._summary_file):
content = pathlib.Path(self._summary_file).read_text()
job_summary = JobSummaryParser.from_json(content)

if job_summary.status == ExecutionStatus.SUCCESS:
return JobStatus.SUCCEEDED.value
if job_summary.status == ExecutionStatus.SKIP_REQUESTED:
return JobStatus.SKIPPED.value

# following indicates failure so we want to include summary
self._message = {"summary": json.loads(content), "logs": self._log_file}

if job_summary.blamee == ResolvableBy.USER_ERROR:
return JobStatus.USER_ERROR.value
if job_summary.blamee == ResolvableBy.CONFIG_ERROR:
return JobStatus.USER_ERROR.value

if job_summary.blamee == ResolvableBy.PLATFORM_ERROR:
return JobStatus.PLATFORM_ERROR.value
else:
if result != 0:
self._message = {
"summary": {"exit_code": result},
"logs": self._log_file,
}
return JobStatus.PLATFORM_ERROR.value
else:
return JobStatus.SUCCEEDED.value

raise Exception("Bug! That code path should not happen")


class LocalDataJobRunException(Exception):
def __init__(self, job_name: str, team_name: str, message: str):
super().__init__(f"Data Job {job_name} of team {team_name}: {message}")
self.job_name = job_name
self.team_name = team_name


class LocalDataJobExecutor(IDataJobExecutor):
"""
This module is responsible for executing local Data Jobs.
"""

def __init__(self):
self._running_jobs: Dict[str, LocalDataJob] = dict()

@staticmethod
def __find_job_path(job_name: str):
candidates = [
os.getcwd(),
]
candidates += [
part
for part in os.environ.get("DAG_LOCAL_RUN_JOB_PATH", "").split(":")
if part
]

for candidate in candidates:
candidate_job_path = os.path.join(candidate, job_name)
if os.path.isdir(candidate_job_path):
return candidate_job_path

raise LocalDataJobRunException(
job_name, "", f"Cannot find the job directory. Search paths: {candidates}"
)

def start_job(
self,
job_name: str,
team_name: str,
started_by: str = None,
arguments: dict = None,
):
if job_name in self._running_jobs:
raise LocalDataJobRunException(
job_name,
team_name,
f"Job run already has been started. Cannot start same job twice.",

Check notice on line 177 in projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py#L177

f-string is missing placeholders (F541)
)
job_path = self.__find_job_path(job_name)

job = LocalDataJob(
job_path,
job_name,
team_name,
arguments,
)
self._running_jobs[job_name] = job
return job.start_run()

def status_job(self, job_name: str, team_name: str, execution_id: str) -> str:
if job_name in self._running_jobs:
return self._running_jobs.get(job_name).status()
else:
return None # TODO: or raise ?

def details_job(self, job_name: str, team_name: str, execution_id: str) -> dict:
if job_name in self._running_jobs:
return self._running_jobs.get(job_name).details()
else:
return dict() # TODO: or raise ?

def job_executions_list(
self, job_name: str, team_name: str
) -> Optional[List[DataJobExecution]]:
return []
7 changes: 7 additions & 0 deletions projects/vdk-plugins/vdk-dag/tests/jobs/fail-job/1_step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):
raise ArithmeticError("cannot do math :(")
2 changes: 2 additions & 0 deletions projects/vdk-plugins/vdk-dag/tests/jobs/local-dag/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[owner]
team = team-awesome
28 changes: 28 additions & 0 deletions projects/vdk-plugins/vdk-dag/tests/jobs/local-dag/dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
import tempfile

from vdk.api.job_input import IJobInput
from vdk.plugin.dag.dag_runner import DagInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
log.info(f"Dummy arguments {job_input.get_arguments()}")
with tempfile.NamedTemporaryFile(prefix="temp-", mode="w+") as temp_file:
job1 = dict(
job_name="write-file-job",
depends_on=[],
arguments=dict(path=temp_file.name),
)
job2 = dict(
job_name="fail-job", depends_on=["write-file-job"], fail_dag_on_error=False
)
job3 = dict(
job_name="read-file-job",
depends_on=["write-file-job"],
arguments=dict(path=temp_file.name),
)
DagInput().run_dag([job1, job2, job3])
14 changes: 14 additions & 0 deletions projects/vdk-plugins/vdk-dag/tests/jobs/read-file-job/1_step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import pathlib

from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):
path = job_input.get_arguments().get("path")
if path:
if pathlib.Path(path).read_text() != "data":
raise ValueError(f"Expected file {path} content to be 'data'")
else:
raise ValueError("No path provided.")
11 changes: 11 additions & 0 deletions projects/vdk-plugins/vdk-dag/tests/jobs/write-file-job/1_step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import pathlib

from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):
path = job_input.get_arguments().get("path")
if path:
pathlib.Path(path).write_text("data")
Loading

0 comments on commit 5d3d2ea

Please sign in to comment.