-
Notifications
You must be signed in to change notification settings - Fork 59
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
In order to be able to test locally a full dag, we need a local executor which will run the data jobs locally. The way it works user sets `DAGS_JOB_EXECUTOR_TYPE=local` and must ensure all jobs are at the same level as the DAG job or the current working directory in order to find them
- Loading branch information
1 parent
78686fa
commit 8a326b6
Showing
13 changed files
with
415 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
205 changes: 205 additions & 0 deletions
205
projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,205 @@ | ||
# Copyright 2021-2024 VMware, Inc. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
import json | ||
import os | ||
import pathlib | ||
import pprint | ||
import shlex | ||
import subprocess | ||
import tempfile | ||
from datetime import datetime | ||
from typing import Dict | ||
from typing import List | ||
from typing import Optional | ||
|
||
from taurus_datajob_api import DataJobExecution | ||
from vdk.api.job_input import IJobArguments | ||
from vdk.internal.builtin_plugins.run.run_status import ExecutionStatus | ||
from vdk.internal.builtin_plugins.run.summary_output import JobSummaryParser | ||
from vdk.internal.core.error_classifiers import ResolvableBy | ||
from vdk.plugin.dag.dags import IDataJobExecutor | ||
from vdk.plugin.dag.remote_data_job import JobStatus | ||
|
||
RUNNING_STATUSES = [JobStatus.RUNNING.value, JobStatus.SUBMITTED.value] | ||
|
||
|
||
class LocalDataJob: | ||
def __init__(self, job_path: str, job_name: str, team_name: str, arguments: dict): | ||
self._job_name = job_name | ||
self._team_name = team_name | ||
self._job_path = job_path | ||
self._arguments = arguments | ||
|
||
temp_dir = os.path.join(tempfile.gettempdir(), "vdk-jobs", job_name) | ||
pathlib.Path(temp_dir).mkdir(parents=True, exist_ok=True) | ||
self._log_file = os.path.join(temp_dir, "run.log") | ||
self._summary_file = os.path.join(temp_dir, "run-summary.json") | ||
self._log_file_handle = None | ||
self._process = None | ||
self._start_time = None | ||
self._end_time = None | ||
self._message = None | ||
|
||
def close(self): | ||
if self._log_file_handle: | ||
self._log_file_handle.close() | ||
|
||
@staticmethod | ||
def __prepare_vdk_run_command(path: str, arguments: dict): | ||
path = shlex.quote(str(path)) | ||
cmd: list[str] = ["vdk", "run", f"{path}"] | ||
if arguments: | ||
arguments = json.dumps(arguments) | ||
cmd.append("--arguments") | ||
cmd.append(f"{arguments}") | ||
return cmd | ||
|
||
def start_run(self): | ||
cmd = self.__prepare_vdk_run_command(self._job_path, self._arguments) | ||
|
||
environ_copy = os.environ.copy() | ||
environ_copy["JOB_RUN_SUMMARY_FILE_PATH"] = self._summary_file | ||
|
||
self._log_file_handle = pathlib.Path(self._log_file).open(mode="w+") | ||
|
||
self._process = subprocess.Popen( | ||
cmd, | ||
stdout=self._log_file_handle, | ||
stderr=self._log_file_handle, | ||
env=environ_copy, | ||
shell=False, | ||
) | ||
self._start_time = datetime.now() | ||
self._message = {"logs": self._log_file} | ||
return str(self._process.pid) | ||
|
||
def details(self) -> Dict: | ||
details = dict( | ||
start_time=self._start_time, | ||
end_time=self._end_time, | ||
status=self.status(), | ||
message=self._message, | ||
started_by="", | ||
type="local", | ||
deployment=None, | ||
) | ||
return details | ||
|
||
def status(self) -> str: | ||
if not self._process: | ||
return JobStatus.SUBMITTED.value | ||
|
||
result = self._process.poll() | ||
if result is None: | ||
return JobStatus.RUNNING.value | ||
|
||
if not self._end_time: | ||
self._end_time = datetime.now() | ||
|
||
if os.path.exists(self._summary_file): | ||
content = pathlib.Path(self._summary_file).read_text() | ||
job_summary = JobSummaryParser.from_json(content) | ||
|
||
if job_summary.status == ExecutionStatus.SUCCESS: | ||
return JobStatus.SUCCEEDED.value | ||
if job_summary.status == ExecutionStatus.SKIP_REQUESTED: | ||
return JobStatus.SKIPPED.value | ||
|
||
# following indicates failure so we want to include summary | ||
self._message = {"summary": json.loads(content), "logs": self._log_file} | ||
|
||
if job_summary.blamee == ResolvableBy.USER_ERROR: | ||
return JobStatus.USER_ERROR.value | ||
if job_summary.blamee == ResolvableBy.CONFIG_ERROR: | ||
return JobStatus.USER_ERROR.value | ||
|
||
if job_summary.blamee == ResolvableBy.PLATFORM_ERROR: | ||
return JobStatus.PLATFORM_ERROR.value | ||
else: | ||
if result != 0: | ||
self._message = { | ||
"summary": {"exit_code": result}, | ||
"logs": self._log_file, | ||
} | ||
return JobStatus.PLATFORM_ERROR.value | ||
else: | ||
return JobStatus.SUCCEEDED.value | ||
|
||
raise Exception("Bug! That code path should not happen") | ||
|
||
|
||
class LocalDataJobRunException(Exception): | ||
def __init__(self, job_name: str, team_name: str, message: str): | ||
super().__init__(f"Data Job {job_name} of team {team_name}: {message}") | ||
self.job_name = job_name | ||
self.team_name = team_name | ||
|
||
|
||
class LocalDataJobExecutor(IDataJobExecutor): | ||
""" | ||
This module is responsible for executing local Data Jobs. | ||
""" | ||
|
||
def __init__(self): | ||
self._running_jobs: Dict[str, LocalDataJob] = dict() | ||
|
||
@staticmethod | ||
def __find_job_path(job_name: str): | ||
candidates = [ | ||
os.getcwd(), | ||
] | ||
candidates += [ | ||
part | ||
for part in os.environ.get("DAG_LOCAL_RUN_JOB_PATH", "").split(":") | ||
if part | ||
] | ||
|
||
for candidate in candidates: | ||
candidate_job_path = os.path.join(candidate, job_name) | ||
if os.path.isdir(candidate_job_path): | ||
return candidate_job_path | ||
|
||
raise LocalDataJobRunException( | ||
job_name, "", f"Cannot find the job directory. Search paths: {candidates}" | ||
) | ||
|
||
def start_job( | ||
self, | ||
job_name: str, | ||
team_name: str, | ||
started_by: str = None, | ||
arguments: dict = None, | ||
): | ||
if job_name in self._running_jobs: | ||
raise LocalDataJobRunException( | ||
job_name, | ||
team_name, | ||
f"Job run already has been started. Cannot start same job twice.", | ||
) | ||
job_path = self.__find_job_path(job_name) | ||
|
||
job = LocalDataJob( | ||
job_path, | ||
job_name, | ||
team_name, | ||
arguments, | ||
) | ||
self._running_jobs[job_name] = job | ||
return job.start_run() | ||
|
||
def status_job(self, job_name: str, team_name: str, execution_id: str) -> str: | ||
if job_name in self._running_jobs: | ||
return self._running_jobs.get(job_name).status() | ||
else: | ||
return None # TODO: or raise ? | ||
|
||
def details_job(self, job_name: str, team_name: str, execution_id: str) -> dict: | ||
if job_name in self._running_jobs: | ||
return self._running_jobs.get(job_name).details() | ||
else: | ||
return dict() # TODO: or raise ? | ||
|
||
def job_executions_list( | ||
self, job_name: str, team_name: str | ||
) -> Optional[List[DataJobExecution]]: | ||
return [] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
# Copyright 2021-2024 VMware, Inc. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
from vdk.api.job_input import IJobInput | ||
|
||
|
||
def run(job_input: IJobInput): | ||
raise ArithmeticError("cannot do math :(") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
[owner] | ||
team = team-awesome |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
# Copyright 2021-2024 VMware, Inc. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
import logging | ||
import tempfile | ||
|
||
from vdk.api.job_input import IJobInput | ||
from vdk.plugin.dag.dag_runner import DagInput | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
def run(job_input: IJobInput): | ||
log.info(f"Dummy arguments {job_input.get_arguments()}") | ||
with tempfile.NamedTemporaryFile(prefix="temp-", mode="w+") as temp_file: | ||
job1 = dict( | ||
job_name="write-file-job", | ||
depends_on=[], | ||
arguments=dict(path=temp_file.name), | ||
) | ||
job2 = dict( | ||
job_name="fail-job", depends_on=["write-file-job"], fail_dag_on_error=False | ||
) | ||
job3 = dict( | ||
job_name="read-file-job", | ||
depends_on=["write-file-job"], | ||
arguments=dict(path=temp_file.name), | ||
) | ||
DagInput().run_dag([job1, job2, job3]) |
14 changes: 14 additions & 0 deletions
14
projects/vdk-plugins/vdk-dag/tests/jobs/read-file-job/1_step.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
# Copyright 2021-2024 VMware, Inc. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
import pathlib | ||
|
||
from vdk.api.job_input import IJobInput | ||
|
||
|
||
def run(job_input: IJobInput): | ||
path = job_input.get_arguments().get("path") | ||
if path: | ||
if pathlib.Path(path).read_text() != "data": | ||
raise ValueError(f"Expected file {path} content to be 'data'") | ||
else: | ||
raise ValueError("No path provided.") |
11 changes: 11 additions & 0 deletions
11
projects/vdk-plugins/vdk-dag/tests/jobs/write-file-job/1_step.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
# Copyright 2021-2024 VMware, Inc. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
import pathlib | ||
|
||
from vdk.api.job_input import IJobInput | ||
|
||
|
||
def run(job_input: IJobInput): | ||
path = job_input.get_arguments().get("path") | ||
if path: | ||
pathlib.Path(path).write_text("data") |
Oops, something went wrong.