Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-dag: add local executor #3097

Merged
merged 1 commit into from
Feb 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions projects/vdk-plugins/vdk-dag/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,23 @@ There are 3 types of jobs right now in terms of how are they started.
* If a DAG job tries to start a job and there is already running such job, the approach of the DAG job would be
similar to the schedule - retry later but more times.

### Local run of DAG jobs

By default the DAG would execute deployed jobs by VDK Control Service.
Those are jobs deployed (usually using vdk deploy command) in managed environment by VDK Control Service.
We will call this "remote" execution.

If you want to test or debug the DAG you would need to run all jobs locally (on your machine only).
This is possible by setting following configuration either as environment variable or in the DAG Job config.ini file

```python
DAGS_JOB_EXECUTOR_TYPE=local
```

This will run all jobs from local file system. The orchestrated jobs will be searched either:
- In the current working directory (where you executed the `vdk run dag-job` command)
- or in the same level as the DAG Job directory.

### Configuration

The configuration variables of the VDK DAGs can be checked by running the command:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,11 @@ def finalize_job(self, job_name):
@staticmethod
def __get_printable_details(details):
del details["deployment"]
return json.dumps(details, default=lambda o: str(o), indent=2)
return json.dumps(
details,
default=lambda o: o.__dict__ if hasattr(o, "__dict__") else str(o),
indent=2,
)

def __get_job(self, job_name) -> TrackableJob:
job: TrackableJob = self._jobs_cache.get(job_name)
Expand Down Expand Up @@ -196,7 +200,7 @@ def start_new_job_execution(
job_name: str,
team_name: str,
started_by: str = None,
arguments: IJobArguments = None,
arguments: dict = None,
) -> str:
"""
Start a new data job execution.
Expand Down
13 changes: 12 additions & 1 deletion projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,23 @@
from vdk.plugin.dag.dag_plugin_configuration import DagPluginConfiguration
from vdk.plugin.dag.dag_validator import DagValidator
from vdk.plugin.dag.dags import TrackableJob
from vdk.plugin.dag.local_executor import LocalDataJobExecutor
from vdk.plugin.dag.remote_data_job_executor import RemoteDataJobExecutor
from vdk.plugin.dag.time_based_queue import TimeBasedQueue

log = logging.getLogger(__name__)


def get_job_executor(executor_type: str):
if executor_type.lower() == "remote":
return RemoteDataJobExecutor()
if executor_type.lower() == "local":
return LocalDataJobExecutor()
raise ValueError(
f"Unsupported executor type: {executor_type}. Must be either remote or local."
)


class Dag:
def __init__(
self,
Expand Down Expand Up @@ -49,7 +60,7 @@ def __init__(
dags_config.dags_dag_execution_check_time_period_seconds()
)
self._job_executor = TrackingDataJobExecutor(
executor=RemoteDataJobExecutor(),
executor=get_job_executor(dags_config.dags_job_executor_type()),
time_between_status_check_seconds=dags_config.dags_time_between_status_check_seconds(),
)
self._dag_validator = DagValidator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
DAGS_TIME_BETWEEN_STATUS_CHECK_SECONDS = "DAGS_TIME_BETWEEN_STATUS_CHECK_SECONDS"
DAGS_MAX_CONCURRENT_RUNNING_JOBS = "DAGS_MAX_CONCURRENT_RUNNING_JOBS"

DAGS_JOB_EXECUTOR_TYPE = "DAGS_JOB_EXECUTOR_TYPE"


class DagPluginConfiguration:
def __init__(self, config: Configuration):
Expand Down Expand Up @@ -68,6 +70,9 @@ def dags_max_concurrent_running_jobs(self):
"""
return self.__config.get_value(DAGS_MAX_CONCURRENT_RUNNING_JOBS)

def dags_job_executor_type(self):
return self.__config.get_value(DAGS_JOB_EXECUTOR_TYPE)


def add_definitions(config_builder: ConfigurationBuilder):
"""
Expand Down Expand Up @@ -128,3 +133,13 @@ def add_definitions(config_builder: ConfigurationBuilder):
"resources. It's advisable to use the default value for this variable."
),
)
config_builder.add(
key=DAGS_JOB_EXECUTOR_TYPE,
default_value="remote",
description=(
"The job executor to use when running the jobs within the DAG"
"There are two possible values : 'remote' and 'local'."
"'remote' executor would run the jobs deployed in the control service."
"'local' executor will try to find the jobs in the current working directory"
),
)
2 changes: 1 addition & 1 deletion projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def start_job(
job_name: str,
team_name: str,
started_by: str = None,
arguments: IJobArguments = None,
arguments: dict = None,
):
"""
Start an execution of a data job and returns its execution id
Expand Down
215 changes: 215 additions & 0 deletions projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import json
import os
import pathlib
import shlex
import subprocess

Check warning on line 7 in projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py#L7

Consider possible security implications associated with the subprocess module.
import tempfile
from datetime import datetime
from typing import Dict
from typing import List
from typing import Optional

from taurus_datajob_api import DataJobExecution
from vdk.internal.builtin_plugins.run.run_status import ExecutionStatus
from vdk.internal.builtin_plugins.run.summary_output import JobSummaryParser
from vdk.internal.core.error_classifiers import ResolvableBy
from vdk.plugin.dag.dags import IDataJobExecutor
from vdk.plugin.dag.remote_data_job import JobStatus

RUNNING_STATUSES = [JobStatus.RUNNING.value, JobStatus.SUBMITTED.value]


class LocalDataJob:
def __init__(self, job_path: str, job_name: str, team_name: str, arguments: dict):
self._job_name = job_name
self._team_name = team_name
self._job_path = job_path
self._arguments = arguments

temp_dir = os.path.join(tempfile.gettempdir(), "vdk-jobs", job_name)
pathlib.Path(temp_dir).mkdir(parents=True, exist_ok=True)
self._log_file = os.path.join(temp_dir, "run.log")
self._summary_file = os.path.join(temp_dir, "run-summary.json")
self._log_file_handle = None
self._process = None
self._start_time = None
self._end_time = None
self._message = None

def close(self):
if self._log_file_handle:
self._log_file_handle.close()
self._log_file_handle = None
if self._process:
# Ensure the process has finished before closing
self._process.wait()
self._process = None

def __del__(self):
self.close()

@staticmethod
def __prepare_vdk_run_command(path: str, arguments: dict):
path = shlex.quote(str(path))
cmd: list[str] = ["vdk", "run", f"{path}"]
if arguments:
arguments = json.dumps(arguments)
cmd.append("--arguments")
cmd.append(f"{arguments}")
return cmd

def start_run(self):
cmd = self.__prepare_vdk_run_command(self._job_path, self._arguments)

environ_copy = os.environ.copy()
environ_copy["JOB_RUN_SUMMARY_FILE_PATH"] = self._summary_file

self._log_file_handle = pathlib.Path(self._log_file).open(mode="w+")

self._process = subprocess.Popen(
cmd,
stdout=self._log_file_handle,
stderr=self._log_file_handle,
env=environ_copy,
shell=False,

Check warning on line 76 in projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py#L76

subprocess call - check for execution of untrusted input.
)
self._start_time = datetime.now()
self._message = {"logs": self._log_file}
return str(self._process.pid)

def details(self) -> Dict:
details = dict(
start_time=self._start_time,
end_time=self._end_time,
status=self.status(),
message=self._message,
started_by="",
type="local",
deployment=None,
)
return details

def status(self) -> str:
if not self._process:
return JobStatus.SUBMITTED.value

result = self._process.poll()
if result is None:
return JobStatus.RUNNING.value

if not self._end_time:
self._end_time = datetime.now()

if os.path.exists(self._summary_file):
return self._determine_status_from_summary()
else:
return self._determine_status_without_summary(result)

def _determine_status_from_summary(self) -> str:
content = pathlib.Path(self._summary_file).read_text()
job_summary = JobSummaryParser.from_json(content)

if job_summary.status == ExecutionStatus.SUCCESS:
return JobStatus.SUCCEEDED.value
elif job_summary.status == ExecutionStatus.SKIP_REQUESTED:
return JobStatus.SKIPPED.value
else:
# update with summary only in case of failure
self._update_message_with_summary(content)
if job_summary.blamee in (
ResolvableBy.USER_ERROR,
ResolvableBy.CONFIG_ERROR,
):
return JobStatus.USER_ERROR.value
else:
return JobStatus.PLATFORM_ERROR.value

def _determine_status_without_summary(self, result: int) -> str:
if result != 0:
# update with summary only in case of failure
self._message = {"summary": {"exit_code": result}, "logs": self._log_file}
return JobStatus.PLATFORM_ERROR.value
else:
return JobStatus.SUCCEEDED.value

def _update_message_with_summary(self, content: str):
self._message = {"summary": json.loads(content), "logs": self._log_file}


class LocalDataJobRunException(Exception):
def __init__(self, job_name: str, team_name: str, message: str):
super().__init__(f"Data Job {job_name} of team {team_name}: {message}")
self.job_name = job_name
self.team_name = team_name


class LocalDataJobExecutor(IDataJobExecutor):
"""
This module is responsible for executing local Data Jobs.
"""

def __init__(self):
self._running_jobs: Dict[str, LocalDataJob] = dict()

@staticmethod
def __find_job_path(job_name: str):
candidates = [
os.getcwd(),
]
candidates += [
part
for part in os.environ.get("DAG_LOCAL_RUN_JOB_PATH", "").split(":")
if part
]

for candidate in candidates:
candidate_job_path = os.path.join(candidate, job_name)
if os.path.isdir(candidate_job_path):
return candidate_job_path

raise LocalDataJobRunException(
job_name, "", f"Cannot find the job directory. Search paths: {candidates}"
)

def start_job(
self,
job_name: str,
team_name: str,
started_by: str = None,
arguments: dict = None,
):
if job_name in self._running_jobs:
raise LocalDataJobRunException(
job_name,
team_name,
f"Job run already has been started. Cannot start same job twice.",

Check notice on line 187 in projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py#L187

f-string is missing placeholders (F541)
)
job_path = self.__find_job_path(job_name)

job = LocalDataJob(
job_path,
job_name,
team_name,
arguments,
)
self._running_jobs[job_name] = job
return job.start_run()

def status_job(self, job_name: str, team_name: str, execution_id: str) -> str:
if job_name in self._running_jobs:
return self._running_jobs.get(job_name).status()
else:
return None # TODO: or raise ?

def details_job(self, job_name: str, team_name: str, execution_id: str) -> dict:
if job_name in self._running_jobs:
return self._running_jobs.get(job_name).details()
else:
return dict() # TODO: or raise ?

def job_executions_list(
self, job_name: str, team_name: str
) -> Optional[List[DataJobExecution]]:
return []
7 changes: 7 additions & 0 deletions projects/vdk-plugins/vdk-dag/tests/jobs/fail-job/1_step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):
raise ArithmeticError("cannot do math :(")
2 changes: 2 additions & 0 deletions projects/vdk-plugins/vdk-dag/tests/jobs/local-dag/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[owner]
team = team-awesome
28 changes: 28 additions & 0 deletions projects/vdk-plugins/vdk-dag/tests/jobs/local-dag/dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
import tempfile

from vdk.api.job_input import IJobInput
from vdk.plugin.dag.dag_runner import DagInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
log.info(f"Dummy arguments {job_input.get_arguments()}")
with tempfile.NamedTemporaryFile(prefix="temp-", mode="w+") as temp_file:
job1 = dict(
job_name="write-file-job",
depends_on=[],
arguments=dict(path=temp_file.name),
)
job2 = dict(
job_name="fail-job", depends_on=["write-file-job"], fail_dag_on_error=False
)
job3 = dict(
job_name="read-file-job",
depends_on=["write-file-job"],
arguments=dict(path=temp_file.name),
)
DagInput().run_dag([job1, job2, job3])
14 changes: 14 additions & 0 deletions projects/vdk-plugins/vdk-dag/tests/jobs/read-file-job/1_step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import pathlib

from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):
path = job_input.get_arguments().get("path")
if path:
if pathlib.Path(path).read_text() != "data":
raise ValueError(f"Expected file {path} content to be 'data'")
else:
raise ValueError("No path provided.")
Loading
Loading