Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-meta-jobs: exec job with arguments #1839

Merged
merged 53 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
3d202ba
vdk-meta-jobs: exec job with arguments
yonitoo Apr 4, 2023
ede7ed4
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 4, 2023
3cac5c2
control-service: update job builders for aws temporary credentials (#…
Mar 30, 2023
93bd11c
build(deps): Bump com.palantir.git-version from 2.0.0 to 3.0.0 in /pr…
dependabot[bot] Mar 30, 2023
84d24b2
frontend: prepare for official release shared and dp libs (#1795)
gorankokin Mar 31, 2023
7c5dc86
vdk-plugins: test only oldest and newest supported python version (#1…
antoniivanov Mar 31, 2023
eea7070
[DRAFT] control-service: fix failing image publisher (#1810)
murphp15 Mar 31, 2023
4555511
vdk-cicd: set ephemeral storage request/limits (#1813)
antoniivanov Mar 31, 2023
c22b566
build(deps): Bump io.swagger.core.v3:swagger-annotations from 2.2.8 t…
dependabot[bot] Mar 31, 2023
d8ff2bb
build(deps): Bump io.swagger.core.v3:swagger-models from 2.2.8 to 2.2…
dependabot[bot] Mar 31, 2023
c9f5847
frontend: add build.sh (#1807)
antoniivanov Mar 31, 2023
a1dd05d
control-service: update helm charts for service account credentials (…
Apr 2, 2023
cdf5ccc
control-service: enable usage of aws temporary credentials (#1787)
Apr 2, 2023
eff5d5a
build(deps): Bump org.mockito:mockito-core
dependabot[bot] Apr 2, 2023
8cd3cdf
Person/hduygu/vdk jupyter UI create deployment handlers (#1788)
duyguHsnHsn Apr 3, 2023
c5b9096
vdk-control-cli: fix circular import dependecy (#1820)
antoniivanov Apr 3, 2023
b6407af
control-service: force job builder deploy (#1823)
Apr 3, 2023
e0f6395
vdk-cicd: apply limit ranges for storage (#1815)
antoniivanov Apr 3, 2023
571fcc4
vdk-lineage: support for latest version sqllineage library (#1816)
antoniivanov Apr 3, 2023
e7ea3fd
vdk-control-cli: use assert_click_status (#1817)
antoniivanov Apr 3, 2023
4da99f2
build(deps): Bump com.palantir.docker from 0.34.0 to 0.35.0 in /proje…
dependabot[bot] Apr 4, 2023
b07dd86
[pre-commit.ci] pre-commit autoupdate (#1838)
pre-commit-ci[bot] Apr 4, 2023
1ec1252
build(deps): Bump org.springframework.retry:spring-retry from 2.0.0 t…
dependabot[bot] Apr 4, 2023
beb7eb1
vdk-control-cli: vdk list -mmm to return executions (#1818)
antoniivanov Apr 4, 2023
ca7e414
Merge branch 'main' into person/ysalambashev/dags-exec-job-with-args
yonitoo Apr 4, 2023
a10bdbb
vdk-meta-jobs: address review comments
yonitoo Apr 4, 2023
236f16a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 4, 2023
61254fa
vdk-meta-jobs: add unit test args assertions, remove args propagation
yonitoo Apr 4, 2023
f8fd1b0
Merge branch 'main' into person/ysalambashev/dags-exec-job-with-args
yonitoo Apr 4, 2023
5627113
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 4, 2023
424e7ca
vdk-meta-jobs: extract helper method for getting job args
yonitoo Apr 5, 2023
f1373fa
Merge branch 'main' into person/ysalambashev/dags-exec-job-with-args
yonitoo Apr 5, 2023
02b2138
vdk-meta-jobs: update README
yonitoo Apr 5, 2023
92fbc19
Merge branch 'main' into person/ysalambashev/dags-exec-job-with-args
yonitoo Apr 5, 2023
1a35ab8
vdk-meta-jobs: make use of IJobArguments interface
yonitoo Apr 6, 2023
763c8c4
Merge branch 'main' into person/ysalambashev/dags-exec-job-with-args
yonitoo Apr 6, 2023
13bcf05
Merge branch 'main' into person/ysalambashev/dags-exec-job-with-args
yonitoo Apr 6, 2023
0b200e9
Merge branch 'main' into person/ysalambashev/dags-exec-job-with-args
yonitoo Apr 6, 2023
39f5b8e
vdk-meta-jobs: optimize imports
yonitoo Apr 6, 2023
f3f6a05
vdk-meta-jobs: address review comments
yonitoo Apr 6, 2023
0bdd939
Merge branch 'main' into person/ysalambashev/dags-exec-job-with-args
yonitoo Apr 6, 2023
c3545ac
vdk-meta-jobs: fix Codacy warnings
yonitoo Apr 6, 2023
628d373
Merge branch 'main' into person/ysalambashev/dags-exec-job-with-args
yonitoo Apr 6, 2023
b546e9f
Merge branch 'main' into person/ysalambashev/dags-exec-job-with-args
yonitoo Apr 7, 2023
6eb3ac4
Merge branch 'main' into person/ysalambashev/dags-exec-job-with-args
yonitoo Apr 10, 2023
ae4ea57
Merge branch 'main' into person/ysalambashev/dags-exec-job-with-args
yonitoo Apr 11, 2023
e2b2bf0
vdk-meta-jobs: add some dag validation code
yonitoo Apr 11, 2023
4678a4c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 11, 2023
165b45b
vdk-meta-jobs: change test prefixes to dag, fix minor validation issues
yonitoo Apr 11, 2023
6b78c95
Merge branch 'main' into person/ysalambashev/dags-exec-job-with-args
yonitoo Apr 11, 2023
e5a6d7c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 11, 2023
6aebc48
vdk-meta-jobs: minor change
yonitoo Apr 11, 2023
4767379
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion projects/vdk-plugins/vdk-meta-jobs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def run(job_input):
"job_name": "name-of-job",
"team_name": "team-of-job",
"fail_meta_job_on_error": True or False,
"arguments": {"key1": value1, "key2": value2},
"depends_on": [name-of-job1, name-of-job2]
},
...
Expand All @@ -38,7 +39,8 @@ def run(job_input):
When defining a job to be run following attributes are supported:
* **job_name**: required, the name of the data job
* **team_name:**: optional, the team of the data job. If omitted , it will use the meta job's team
* **fail_meta_job_on_error**: optional, default is true. if true, the meta job will abort and fail if the orchestrated job fails, if false, meta job won't fail and continue.
* **fail_meta_job_on_error**: optional, default is true. If true, the meta job will abort and fail if the orchestrated job fails, if false, meta job won't fail and continue.
* **arguments**: optional, the arguments that are passed to the underlying orchestrated data job.
* **depends_on**: required (can be empty), list of other jobs that the orchestrated job depends on. The job will not be started until depends_on job have finished.


Expand All @@ -65,38 +67,44 @@ JOBS_RUN_ORDER = [
"job_name": "job1",
"team_name": "team-awesome",
"fail_meta_job_on_error": True,
"arguments": {},
"depends_on": []
},

{
"job_name": "job2",
"team_name": "team-awesome",
"fail_meta_job_on_error": True,
"arguments": {},
"depends_on": ["job1"]
},
{
"job_name": "job3",
"team_name": "team-awesome",
"fail_meta_job_on_error": True,
"arguments": {},
"depends_on": ["job1"]
},
{
"job_name": "job4",
"team_name": "team-awesome",
"fail_meta_job_on_error": True,
"arguments": {},
"depends_on": ["job1"]
},

{
"job_name": "job5",
"team_name": "team-awesome",
"fail_meta_job_on_error": True,
"arguments": {},
"depends_on": ["job3"]
},
{
"job_name": "job6",
"team_name": "team-awesome",
"fail_meta_job_on_error": True,
"arguments": {},
"depends_on": ["job3"]
},
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class SingleJob:
job_name: str
team_name: str = None
fail_meta_job_on_error: bool = True
arguments: dict = None
depends_on: List[str] = field(default_factory=list)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
# SPDX-License-Identifier: Apache-2.0
import json
import logging
import os
import time
from typing import Dict
from typing import Optional

import urllib3.exceptions as url_exception
from vdk.api.job_input import IJobArguments
from vdk.internal.core.errors import ErrorMessage
from vdk.internal.core.errors import UserCodeError
from vdk.plugin.meta_jobs.meta import IDataJobExecutor
Expand Down Expand Up @@ -44,7 +44,7 @@ def start_job(self, job_name: str) -> None:
job = self.__get_job(job_name)
job.start_attempt += 1
execution_id = self.start_new_job_execution(
job_name=job.job_name, team_name=job.team_name
job_name=job.job_name, team_name=job.team_name, arguments=job.arguments
)
log.info(f"Starting new data job execution with id {execution_id}")
job.execution_id = execution_id
Expand Down Expand Up @@ -135,7 +135,9 @@ def get_all_jobs(self):
def get_currently_running_jobs(self):
return [j for j in self._jobs_cache.values() if j.status in ACTIVE_JOB_STATUSES]

def start_new_job_execution(self, job_name: str, team_name: str) -> str:
def start_new_job_execution(
self, job_name: str, team_name: str, arguments: IJobArguments = None
) -> str:
"""
Start a new data job execution.
The stages of the process are:
Expand All @@ -152,6 +154,7 @@ def start_new_job_execution(self, job_name: str, team_name: str) -> str:

:param job_name: name of the data job to be executed
:param team_name: name of the owning team
:param arguments: arguments of the data job
:return: id of the started job execution
"""
current_retries = 0
Expand All @@ -163,7 +166,7 @@ def start_new_job_execution(self, job_name: str, team_name: str) -> str:

while current_retries < ALLOWED_RETRIES:
try:
execution_id = self._executor.start_job(job_name, team_name)
execution_id = self._executor.start_job(job_name, team_name, arguments)
return execution_id
except url_exception.TimeoutError as e:
log.info(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright 2023-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import graphlib
import json
import logging
from collections import namedtuple
from typing import Dict
Expand All @@ -14,7 +15,13 @@
ERROR = Error(
TYPE="type", PERMISSION="permission", REQUIREMENT="requirement", CONFLICT="conflict"
)
allowed_job_keys = {"job_name", "team_name", "fail_meta_job_on_error", "depends_on"}
allowed_job_keys = {
"job_name",
"team_name",
"fail_meta_job_on_error",
"depends_on",
"arguments",
}
required_job_keys = {"job_name", "depends_on"}


Expand Down Expand Up @@ -66,6 +73,7 @@ def _validate_job(self, job: Dict):
self._validate_dependencies(job)
self._validate_team_name(job)
self._validate_fail_meta_job_on_error(job)
self._validate_arguments(job)
log.info(f"Successfully validated job: {job['job_name']}")

def _validate_allowed_and_required_keys(self, job: Dict):
Expand Down Expand Up @@ -141,6 +149,27 @@ def _validate_fail_meta_job_on_error(self, job: Dict):
f" is {type(job['fail_meta_job_on_error'])}. Expected type is bool.",
)

def _validate_arguments(self, job: Dict):
if "arguments" in job:
if not isinstance(job["arguments"], dict):
self._raise_error(
job["job_name"],
ERROR.TYPE,
"The type of the job dict key arguments is not dict.",
f"Change the Data Job Dict value of arguments. "
f"Current type is {type(job['arguments'])}. Expected type is dict.",
)
try:
json.dumps(job["arguments"])
except TypeError as e:
self._raise_error(
list(job),
ERROR.TYPE,
str(e),
f"Change the Data Job Dict value of arguments. "
f"Current type is {type(job['arguments'])} but not serializable as JSON.",
)

def _check_dag_cycles(self, jobs: List[Dict]):
topological_sorter = graphlib.TopologicalSorter()
for job in jobs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
from typing import Optional

from taurus_datajob_api import DataJobExecution
from vdk.api.job_input import IJobArguments
from vdk.plugin.meta_jobs.api import meta_job


class IDataJobExecutor(abc.ABC):
@abc.abstractmethod
def start_job(self, job_name: str, team_name: str):
def start_job(self, job_name: str, team_name: str, arguments: IJobArguments = None):
"""
Start an execution of a data job and returns its execution id
:param arguments:
:param job_name:
:param team_name:
:return: execution id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@


class MetaJobsDag:
def __init__(self, team_name: str, meta_config: MetaPluginConfiguration):
def __init__(
self,
team_name: str,
meta_config: MetaPluginConfiguration,
):
self._team_name = team_name
self._topological_sorter = TopologicalSorter()
self._delayed_starting_jobs = TimeBasedQueue(
Expand All @@ -49,6 +53,7 @@ def build_dag(self, jobs: List[Dict]):
job["job_name"],
job.get("team_name", self._team_name),
job.get("fail_meta_job_on_error", True),
job.get("arguments", None),
)
self._job_executor.register_job(trackable_job)
self._topological_sorter.add(trackable_job.job_name, *job["depends_on"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from taurus_datajob_api import DataJobExecutionRequest
from taurus_datajob_api import DataJobsExecutionApi
from urllib3 import Retry
from vdk.api.job_input import IJobArguments
from vdk.plugin.control_api_auth.authentication import Authentication

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -49,12 +50,14 @@ def __init__(
job_name: str,
team_name: str,
rest_api_url: str,
arguments: IJobArguments = None,
timeout: int = 5, # TODO: Set reasonable default
**kwargs,
) -> None:
self.job_name = job_name
self.team_name = team_name
self._rest_api_url = rest_api_url
self.__job_name = job_name
self.__team_name = team_name
self.__rest_api_url = rest_api_url
self.__arguments = arguments
self.timeout = timeout
self.deployment_id = "production" # currently multiple deployments are not supported so this remains hardcoded
self.auth: Optional[Authentication] = kwargs.pop("auth", None)
Expand All @@ -78,19 +81,19 @@ def __init__(

self.__execution_api = self._get_execution_api()

def start_job_execution(self, **request_kwargs) -> str:
def start_job_execution(self) -> str:
"""
Triggers a manual Datajob execution.

:param: request_kwargs: Request arguments to be included with the HTTP request
"""
execution_request = DataJobExecutionRequest(
started_by="meta-job", # TODO: specify name of meta job
args=request_kwargs,
args=self.__arguments,
)
_, _, headers = self.__execution_api.data_job_execution_start_with_http_info(
team_name=self.team_name,
job_name=self.job_name,
team_name=self.__team_name,
job_name=self.__job_name,
deployment_id=self.deployment_id,
data_job_execution_request=execution_request,
_request_timeout=self.timeout,
Expand All @@ -107,8 +110,8 @@ def cancel_job_execution(self, execution_id: str) -> None:
:param execution_id: ID of the job execution
"""
self.__execution_api.data_job_execution_cancel(
team_name=self.team_name,
job_name=self.job_name,
team_name=self.__team_name,
job_name=self.__job_name,
execution_id=execution_id,
_request_timeout=self.timeout,
)
Expand All @@ -121,7 +124,9 @@ def get_job_execution_log(self, execution_id: str) -> str:
:return: job execution logs
"""
return self.__execution_api.data_job_logs_download(
team_name=self.team_name, job_name=self.job_name, execution_id=execution_id
team_name=self.__team_name,
job_name=self.__job_name,
execution_id=execution_id,
).logs

def get_job_execution_status(self, execution_id: str) -> DataJobExecution:
Expand All @@ -132,7 +137,9 @@ def get_job_execution_status(self, execution_id: str) -> DataJobExecution:
:return: The execution status object listing details about the status of this particular execution
"""
job_execution: DataJobExecution = self.__execution_api.data_job_execution_read(
team_name=self.team_name, job_name=self.job_name, execution_id=execution_id
team_name=self.__team_name,
job_name=self.__job_name,
execution_id=execution_id,
)
return job_execution

Expand All @@ -144,7 +151,7 @@ def get_job_executions(self) -> Optional[List[DataJobExecution]]:
:return: A list of DataJobExecution objects for the available executions.
"""
job_execution_list = self.__execution_api.data_job_execution_list(
team_name=self.team_name, job_name=self.job_name
team_name=self.__team_name, job_name=self.__job_name
)
return job_execution_list

Expand Down Expand Up @@ -200,7 +207,7 @@ def wait_for_job(
return job_status

def _get_execution_api(self):
rest_api_url = self._rest_api_url
rest_api_url = self.__rest_api_url

config = Configuration(host=rest_api_url, api_key=None)
config.connection_pool_maxsize = self.http_connection_pool_maxsize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@
from typing import Optional

from taurus_datajob_api import DataJobExecution
from vdk.api.job_input import IJobArguments
from vdk.internal.control.configuration.vdk_config import VDKConfig
from vdk.plugin.meta_jobs.meta import IDataJobExecutor
from vdk.plugin.meta_jobs.remote_data_job import RemoteDataJob


class RemoteDataJobExecutor(IDataJobExecutor):
def start_job(self, job_name: str, team_name: str):
def start_job(self, job_name: str, team_name: str, arguments: IJobArguments = None):
vdk_cfg = VDKConfig()
job = RemoteDataJob(job_name, team_name, vdk_cfg.control_service_rest_api_url)
job = RemoteDataJob(
job_name, team_name, vdk_cfg.control_service_rest_api_url, arguments
)
return job.start_job_execution()
# catch error on 409

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[owner]
team = team-awesome
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging

from vdk.api.job_input import IJobInput
from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
log.info(f"Dummy arguments {job_input.get_arguments()}")

job1 = dict(job_name="job1", depends_on=[])
job2 = dict(
job_name="job2",
depends_on=["job1"],
fail_meta_job_on_error=False,
arguments={"table_name": "test_table", "counter": 123},
)
job3 = dict(job_name="job3", depends_on=["job1"])
job4 = dict(job_name="job4", depends_on=["job2", "job3"])
MetaJobInput().run_meta_job([job1, job2, job3, job4])
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[owner]
team = team-awesome
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging

from vdk.api.job_input import IJobInput
from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
log.info(f"Dummy arguments {job_input.get_arguments()}")

job1 = dict(job_name="job1", depends_on=[])
job2 = dict(
job_name="job2",
depends_on=["job1"],
fail_meta_job_on_error=False,
arguments={},
)
job3 = dict(job_name="job3", depends_on=["job1"])
job4 = dict(job_name="job4", depends_on=["job2", "job3"])
MetaJobInput().run_meta_job([job1, job2, job3, job4])
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[owner]
team = team-awesome
Loading