Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-dag: DAGs propagate their execution type to their component jobs #2080

Merged
merged 22 commits into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
20db712
vdk-dag: propagate dag execution type downstream
yonitoo Apr 25, 2023
a77528d
Merge branch 'main' into person/ysalambashev/vdk-dag-propagate-exec-type
yonitoo Apr 28, 2023
83cf4fd
Merge branch 'main' into person/ysalambashev/vdk-dag-propagate-exec-type
yonitoo May 2, 2023
36e71df
Merge branch 'main' into person/ysalambashev/vdk-dag-propagate-exec-type
yonitoo May 10, 2023
870e46a
vdk-dag: fix trackable_job started_by passing on build
yonitoo May 10, 2023
f2ed520
Merge branch 'main' into person/ysalambashev/vdk-dag-propagate-exec-type
yonitoo May 17, 2023
ef37d9c
vdk-dag: add request handler for testing purposes
yonitoo May 17, 2023
b63976d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 17, 2023
20b3b66
control-service: upgrade python client (#2076)
murphp15 May 17, 2023
aa48dcf
quickstart-vdk: ignore explore page and widgets in frontend (#2073)
DeltaMichael May 17, 2023
a924b17
vdk-airflow: fix failing tests (#2078)
murphp15 May 17, 2023
5f616b6
vdk-control-cli: upgrade python client (#2077)
murphp15 May 17, 2023
a37688d
build(deps): Bump io.swagger.core.v3:swagger-annotations from 2.2.9 t…
dependabot[bot] May 18, 2023
edf3b04
vdk-dag: add unit test
yonitoo May 18, 2023
5fc3191
Merge branch 'main' into person/ysalambashev/vdk-dag-propagate-exec-type
yonitoo May 18, 2023
a1a1c72
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 18, 2023
abd6828
vdk-jupyter: UI failing cell marks (#2057)
duyguHsnHsn May 18, 2023
cf95b37
vdk-jupyter: pin npm packages to previous version (#2088)
duyguHsnHsn May 18, 2023
cdcc20f
vdk-dag: address review comments
yonitoo May 19, 2023
d2b0e54
Merge branch 'main' into person/ysalambashev/vdk-dag-propagate-exec-type
yonitoo May 19, 2023
f63575d
vdk-airflow: fix failing tests
murphp15 May 19, 2023
d3ae7d2
vdk-airflow: fix failing tests
murphp15 May 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ def start_job(self, job_name: str) -> None:
job = self.__get_job(job_name)
job.start_attempt += 1
execution_id = self.start_new_job_execution(
job_name=job.job_name, team_name=job.team_name, arguments=job.arguments
job_name=job.job_name,
team_name=job.team_name,
started_by=job.details.get("started_by"),
arguments=job.arguments,
)
log.info(f"Starting new data job execution with id {execution_id}")
job.execution_id = execution_id
Expand Down Expand Up @@ -138,6 +141,19 @@ def status(self, job_name: str) -> str:
log.debug(f"Job status: {job}")
return job.status

def execution_type(self, job_name: str, team_name: str, execution_id: str) -> str:
"""
Gets the execution type of a job.

:param execution_id: the execution id of the job
:param team_name: the name of the owning team
:param job_name: the name of the job
:return: the job execution type (manual/scheduled)
"""
details = self._executor.details_job(job_name, team_name, execution_id)
log.debug(f"Job execution type: {details.get('type')}")
return details.get("type", "manual")

def get_finished_job_names(self):
"""
:return: list of the names of all the finalized jobs
Expand Down Expand Up @@ -175,7 +191,11 @@ def get_currently_running_jobs(self):
return [j for j in self._jobs_cache.values() if j.status in ACTIVE_JOB_STATUSES]

def start_new_job_execution(
self, job_name: str, team_name: str, arguments: IJobArguments = None
self,
job_name: str,
team_name: str,
started_by: str = None,
arguments: IJobArguments = None,
) -> str:
"""
Start a new data job execution.
Expand All @@ -193,6 +213,7 @@ def start_new_job_execution(

:param job_name: name of the data job to be executed
:param team_name: name of the owning team
:param started_by: the execution type and the name of the DAG job
:param arguments: arguments of the data job
:return: id of the started job execution
"""
Expand All @@ -205,7 +226,9 @@ def start_new_job_execution(

while current_retries < ALLOWED_RETRIES:
try:
execution_id = self._executor.start_job(job_name, team_name, arguments)
execution_id = self._executor.start_job(
job_name, team_name, started_by, arguments
)
return execution_id
except url_exception.TimeoutError as e:
log.info(
Expand Down
29 changes: 28 additions & 1 deletion projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@


class Dag:
def __init__(self, team_name: str, dags_config: DagPluginConfiguration):
def __init__(
self,
team_name: str,
dags_config: DagPluginConfiguration,
job_name: str = None,
execution_id: str = None,
):
"""
This module deals with all the DAG-related operations such as build and execute.

Expand All @@ -47,6 +53,25 @@ def __init__(self, team_name: str, dags_config: DagPluginConfiguration):
time_between_status_check_seconds=dags_config.dags_time_between_status_check_seconds(),
)
self._dag_validator = DagValidator()
if job_name is not None and execution_id is not None:
try:
self._started_by = (
self._job_executor.execution_type(job_name, team_name, execution_id)
+ "/"
+ job_name
)
except ApiException as e:
if e.status == 404:
log.debug(
f"Local job runs return 404 status when getting the execution type: {e}"
)
else:
log.info(
f"Unexpected error while checking for job execution type: {e}"
)
self._started_by = f"manual/{job_name}"
else:
self._started_by = "manual/default"

def build_dag(self, jobs: List[Dict]):
"""
Expand All @@ -62,7 +87,9 @@ def build_dag(self, jobs: List[Dict]):
job.get("team_name", self._team_name),
job.get("fail_dag_on_error", True),
job.get("arguments", None),
job.get("details", {}),
)
trackable_job.details = {"started_by": self._started_by}
self._job_executor.register_job(trackable_job)
self._topological_sorter.add(trackable_job.job_name, *job["depends_on"])

Expand Down
5 changes: 5 additions & 0 deletions projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from vdk.internal.builtin_plugins.config.job_config import JobConfigKeys
from vdk.internal.builtin_plugins.run.job_context import JobContext
from vdk.internal.core.config import ConfigurationBuilder
from vdk.internal.core.statestore import CommonStoreKeys
from vdk.plugin.dag import dag_runner
from vdk.plugin.dag.dag_plugin_configuration import add_definitions
from vdk.plugin.dag.dag_plugin_configuration import DagPluginConfiguration
Expand All @@ -23,6 +24,10 @@ def run_job(context: JobContext) -> None:
dag_runner.DAG_CONFIG = DagPluginConfiguration(
context.core_context.configuration
)
dag_runner.JOB_NAME = context.name
dag_runner.EXECUTION_ID = context.core_context.state.get(
CommonStoreKeys.EXECUTION_ID
)

@staticmethod
@hookimpl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

TEAM_NAME: Optional[str] = None
DAG_CONFIG = None
JOB_NAME: Optional[str] = None
EXECUTION_ID: Optional[str] = None

log = logging.getLogger(__name__)

Expand All @@ -31,7 +33,7 @@ def run_dag(self, jobs: List[Dict]):
:param jobs: the list of jobs that are part of the DAG
:return:
"""
dag = Dag(TEAM_NAME, DAG_CONFIG)
dag = Dag(TEAM_NAME, DAG_CONFIG, JOB_NAME, EXECUTION_ID)
dag.build_dag(jobs)
dag.execute_dag()
log.info(f"DAG summary:\n{dag}")
13 changes: 10 additions & 3 deletions projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,16 @@ class IDataJobExecutor(abc.ABC):
"""

@abc.abstractmethod
def start_job(self, job_name: str, team_name: str, arguments: IJobArguments = None):
def start_job(
self,
job_name: str,
team_name: str,
started_by: str = None,
arguments: IJobArguments = None,
):
"""
Start an execution of a data job and returns its execution id
:param started_by:
:param arguments:
:param job_name:
:param team_name:
Expand All @@ -40,11 +47,11 @@ def status_job(self, job_name: str, team_name: str, execution_id: str):
@abc.abstractmethod
def details_job(self, job_name: str, team_name: str, execution_id: str) -> dict:
"""
Get the current status of a data job execution
Get the current details of a data job execution
:param job_name:
:param team_name:
:param execution_id:
:return: status in string as defined by Control Service API
:return: details in string as defined by Control Service API
"""
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(
job_name: str,
team_name: str,
rest_api_url: str,
started_by: str = None,
arguments: IJobArguments = None,
timeout: int = 5, # TODO: Set reasonable default
**kwargs,
Expand All @@ -67,6 +68,7 @@ def __init__(
self.__team_name = team_name
self.__rest_api_url = rest_api_url
self.__arguments = arguments
self.__started_by = started_by
self.timeout = timeout
self.deployment_id = "production" # currently multiple deployments are not supported so this remains hardcoded
self.auth: Optional[Authentication] = kwargs.pop("auth", None)
Expand All @@ -92,12 +94,12 @@ def __init__(

def start_job_execution(self) -> str:
"""
Triggers a manual Datajob execution.
Triggers a Datajob execution.

:param: request_kwargs: Request arguments to be included with the HTTP request
"""
execution_request = DataJobExecutionRequest(
started_by="dag", # TODO: specify name of dag
started_by=self.__started_by,
args=self.__arguments,
)
_, _, headers = self.__execution_api.data_job_execution_start_with_http_info(
Expand Down Expand Up @@ -138,7 +140,7 @@ def get_job_execution_log(self, execution_id: str) -> str:
execution_id=execution_id,
).logs

def get_job_execution_status(self, execution_id: str) -> DataJobExecution:
def get_job_execution_details(self, execution_id: str) -> DataJobExecution:
"""
Returns the execution status for a particular job execution.

Expand Down Expand Up @@ -186,7 +188,7 @@ def wait_for_job(
time.sleep(wait_seconds)

try:
job_execution = self.get_job_execution_status(execution_id)
job_execution = self.get_job_execution_details(execution_id)
job_status = job_execution.status
except Exception as err:
log.info("VDK Control Service returned error: %s", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,35 @@ class RemoteDataJobExecutor(IDataJobExecutor):
This module is responsible for executing remote Data Jobs.
"""

def start_job(self, job_name: str, team_name: str, arguments: IJobArguments = None):
def start_job(
self,
job_name: str,
team_name: str,
started_by: str = None,
arguments: IJobArguments = None,
):
vdk_cfg = VDKConfig()
job = RemoteDataJob(
job_name, team_name, vdk_cfg.control_service_rest_api_url, arguments
job_name,
team_name,
vdk_cfg.control_service_rest_api_url,
started_by,
arguments,
)
return job.start_job_execution()
# catch error on 409

def status_job(self, job_name: str, team_name: str, execution_id: str) -> str:
vdk_cfg = VDKConfig()
job = RemoteDataJob(job_name, team_name, vdk_cfg.control_service_rest_api_url)
status = job.get_job_execution_status(execution_id)
return status.status
details = job.get_job_execution_details(execution_id)
return details.status

def details_job(self, job_name: str, team_name: str, execution_id: str) -> dict:
vdk_cfg = VDKConfig()
job = RemoteDataJob(job_name, team_name, vdk_cfg.control_service_rest_api_url)
status = job.get_job_execution_status(execution_id)
return status.to_dict()
details = job.get_job_execution_details(execution_id)
return details.to_dict()

def job_executions_list(
self, job_name: str, team_name: str
Expand Down
Loading