Skip to content

Commit

Permalink
vdk-dag: DAGs propagate their execution type to their component jobs (#…
Browse files Browse the repository at this point in the history
…2080)

What:
Currently, jobs ran by DAGs are labelled manual executions. This is due
to the execution being triggered through the Execution API. Ideally, we
would like that if a DAGs is a manual execution, then all component jobs
are also manual execution, and likewise for scheduled runs. This change
achieves this.

Why: To achieve consistency with the data jobs' execution type.

Testing Done: all tests are passing, introduced unit test

Signed-off-by: Yoan Salambashev <[email protected]>
  • Loading branch information
yonitoo authored May 19, 2023
1 parent 3c3bdbe commit a256ba9
Show file tree
Hide file tree
Showing 11 changed files with 217 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def read_access_token(self) -> str:
class DummyStatusResponse:
status = 200
data = b"ddf"
headers = {"Location": "job-execution-id-01"}

def getheader(self, str):
return "json"
Expand All @@ -39,7 +40,7 @@ def setUp(self):

@mock.patch("taurus_datajob_api.api_client.ApiClient.call_api")
def test_start_job_execution(self, mock_call_api):
mock_call_api.return_value = (None, None, {"Location": "job-execution-id-01"})
mock_call_api.return_value = DummyStatusResponse()

self.hook.start_job_execution()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
from vdk_provider.operators.vdk import VDKOperator


class DummyApiResponse:
headers = {"Location": "job-execution-id-01"}


def call_api_return_func(*args, **kwargs):
if (
args[0]
Expand All @@ -21,7 +25,7 @@ def call_api_return_func(*args, **kwargs):
"deployment_id": "production",
}
):
return None, None, {"Location": "job-execution-id-01"}
return DummyApiResponse()
elif (
args[0]
== "/data-jobs/for-team/{team_name}/jobs/{job_name}/executions/{execution_id}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ def start_job_execution(self, **request_kwargs) -> str:
started_by="airflow-provider-vdk",
args=request_kwargs,
)
_, _, headers = self.__execution_api.data_job_execution_start_with_http_info(
headers = self.__execution_api.data_job_execution_start_with_http_info(
team_name=self.team_name,
job_name=self.job_name,
deployment_id=self.deployment_id,
data_job_execution_request=execution_request,
_request_timeout=self.timeout,
)
).headers
log.debug(f"Received headers: {headers}")

job_execution_id = os.path.basename(headers["Location"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ def start_job(self, job_name: str) -> None:
job = self.__get_job(job_name)
job.start_attempt += 1
execution_id = self.start_new_job_execution(
job_name=job.job_name, team_name=job.team_name, arguments=job.arguments
job_name=job.job_name,
team_name=job.team_name,
started_by=job.details.get("started_by"),
arguments=job.arguments,
)
log.info(f"Starting new data job execution with id {execution_id}")
job.execution_id = execution_id
Expand Down Expand Up @@ -138,6 +141,20 @@ def status(self, job_name: str) -> str:
log.debug(f"Job status: {job}")
return job.status

def execution_type(self, job_name: str, team_name: str, execution_id: str) -> str:
"""
Gets the execution type of a job.
:param execution_id: the execution id of the job
:param team_name: the name of the owning team
:param job_name: the name of the job
:return: the job execution type (manual/scheduled)
"""
details = self._executor.details_job(job_name, team_name, execution_id)
log.debug(f"Job execution type: {details.get('type')}")
# the default value for execution type is manual
return details.get("type", "manual")

def get_finished_job_names(self):
"""
:return: list of the names of all the finalized jobs
Expand Down Expand Up @@ -175,7 +192,11 @@ def get_currently_running_jobs(self):
return [j for j in self._jobs_cache.values() if j.status in ACTIVE_JOB_STATUSES]

def start_new_job_execution(
self, job_name: str, team_name: str, arguments: IJobArguments = None
self,
job_name: str,
team_name: str,
started_by: str = None,
arguments: IJobArguments = None,
) -> str:
"""
Start a new data job execution.
Expand All @@ -193,6 +214,7 @@ def start_new_job_execution(
:param job_name: name of the data job to be executed
:param team_name: name of the owning team
:param started_by: the execution type and the name of the DAG job
:param arguments: arguments of the data job
:return: id of the started job execution
"""
Expand All @@ -205,7 +227,9 @@ def start_new_job_execution(

while current_retries < ALLOWED_RETRIES:
try:
execution_id = self._executor.start_job(job_name, team_name, arguments)
execution_id = self._executor.start_job(
job_name, team_name, started_by, arguments
)
return execution_id
except url_exception.TimeoutError as e:
log.info(
Expand Down
36 changes: 35 additions & 1 deletion projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@


class Dag:
def __init__(self, team_name: str, dags_config: DagPluginConfiguration):
def __init__(
self,
team_name: str,
dags_config: DagPluginConfiguration,
job_name: str = None,
execution_id: str = None,
):
"""
This module deals with all the DAG-related operations such as build and execute.
Expand All @@ -47,6 +53,32 @@ def __init__(self, team_name: str, dags_config: DagPluginConfiguration):
time_between_status_check_seconds=dags_config.dags_time_between_status_check_seconds(),
)
self._dag_validator = DagValidator()
if job_name is not None:
if execution_id is not None:
try:
self._started_by = (
self._job_executor.execution_type(
job_name, team_name, execution_id
)
+ "/"
+ job_name
)
except ApiException as e:
if e.status == 404:
log.debug(
f"Job {job_name} of team {team_name} with execution id {execution_id} failed. "
f"Local job runs return 404 status when getting the execution type: {e}"
)
else:
log.info(
f"Unexpected error while checking for job execution type for job {job_name} "
f"with execution id {execution_id} of team {team_name}: {e}"
)
self._started_by = f"manual/{job_name}"
else:
self._started_by = f"manual/{job_name}"
else:
self._started_by = "manual/default"

def build_dag(self, jobs: List[Dict]):
"""
Expand All @@ -62,7 +94,9 @@ def build_dag(self, jobs: List[Dict]):
job.get("team_name", self._team_name),
job.get("fail_dag_on_error", True),
job.get("arguments", None),
job.get("details", {}),
)
trackable_job.details = {"started_by": self._started_by}
self._job_executor.register_job(trackable_job)
self._topological_sorter.add(trackable_job.job_name, *job["depends_on"])

Expand Down
5 changes: 5 additions & 0 deletions projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from vdk.internal.builtin_plugins.config.job_config import JobConfigKeys
from vdk.internal.builtin_plugins.run.job_context import JobContext
from vdk.internal.core.config import ConfigurationBuilder
from vdk.internal.core.statestore import CommonStoreKeys
from vdk.plugin.dag import dag_runner
from vdk.plugin.dag.dag_plugin_configuration import add_definitions
from vdk.plugin.dag.dag_plugin_configuration import DagPluginConfiguration
Expand All @@ -23,6 +24,10 @@ def run_job(context: JobContext) -> None:
dag_runner.DAG_CONFIG = DagPluginConfiguration(
context.core_context.configuration
)
dag_runner.JOB_NAME = context.name
dag_runner.EXECUTION_ID = context.core_context.state.get(
CommonStoreKeys.EXECUTION_ID
)

@staticmethod
@hookimpl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

TEAM_NAME: Optional[str] = None
DAG_CONFIG = None
JOB_NAME: Optional[str] = None
EXECUTION_ID: Optional[str] = None

log = logging.getLogger(__name__)

Expand All @@ -31,7 +33,7 @@ def run_dag(self, jobs: List[Dict]):
:param jobs: the list of jobs that are part of the DAG
:return:
"""
dag = Dag(TEAM_NAME, DAG_CONFIG)
dag = Dag(TEAM_NAME, DAG_CONFIG, JOB_NAME, EXECUTION_ID)
dag.build_dag(jobs)
dag.execute_dag()
log.info(f"DAG summary:\n{dag}")
13 changes: 10 additions & 3 deletions projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,16 @@ class IDataJobExecutor(abc.ABC):
"""

@abc.abstractmethod
def start_job(self, job_name: str, team_name: str, arguments: IJobArguments = None):
def start_job(
self,
job_name: str,
team_name: str,
started_by: str = None,
arguments: IJobArguments = None,
):
"""
Start an execution of a data job and returns its execution id
:param started_by:
:param arguments:
:param job_name:
:param team_name:
Expand All @@ -40,11 +47,11 @@ def status_job(self, job_name: str, team_name: str, execution_id: str):
@abc.abstractmethod
def details_job(self, job_name: str, team_name: str, execution_id: str) -> dict:
"""
Get the current status of a data job execution
Get the current details of a data job execution
:param job_name:
:param team_name:
:param execution_id:
:return: status in string as defined by Control Service API
:return: details in string as defined by Control Service API
"""
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(
job_name: str,
team_name: str,
rest_api_url: str,
started_by: str = None,
arguments: IJobArguments = None,
timeout: int = 5, # TODO: Set reasonable default
**kwargs,
Expand All @@ -67,6 +68,7 @@ def __init__(
self.__team_name = team_name
self.__rest_api_url = rest_api_url
self.__arguments = arguments
self.__started_by = started_by
self.timeout = timeout
self.deployment_id = "production" # currently multiple deployments are not supported so this remains hardcoded
self.auth: Optional[Authentication] = kwargs.pop("auth", None)
Expand All @@ -92,21 +94,21 @@ def __init__(

def start_job_execution(self) -> str:
"""
Triggers a manual Datajob execution.
Triggers a Datajob execution.
:param: request_kwargs: Request arguments to be included with the HTTP request
"""
execution_request = DataJobExecutionRequest(
started_by="dag", # TODO: specify name of dag
started_by=self.__started_by,
args=self.__arguments,
)
_, _, headers = self.__execution_api.data_job_execution_start_with_http_info(
headers = self.__execution_api.data_job_execution_start_with_http_info(
team_name=self.__team_name,
job_name=self.__job_name,
deployment_id=self.deployment_id,
data_job_execution_request=execution_request,
_request_timeout=self.timeout,
)
).headers
log.debug(f"Received headers: {headers}")

job_execution_id = os.path.basename(headers["Location"])
Expand Down Expand Up @@ -138,7 +140,7 @@ def get_job_execution_log(self, execution_id: str) -> str:
execution_id=execution_id,
).logs

def get_job_execution_status(self, execution_id: str) -> DataJobExecution:
def get_job_execution_details(self, execution_id: str) -> DataJobExecution:
"""
Returns the execution status for a particular job execution.
Expand Down Expand Up @@ -186,7 +188,7 @@ def wait_for_job(
time.sleep(wait_seconds)

try:
job_execution = self.get_job_execution_status(execution_id)
job_execution = self.get_job_execution_details(execution_id)
job_status = job_execution.status
except Exception as err:
log.info("VDK Control Service returned error: %s", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,35 @@ class RemoteDataJobExecutor(IDataJobExecutor):
This module is responsible for executing remote Data Jobs.
"""

def start_job(self, job_name: str, team_name: str, arguments: IJobArguments = None):
def start_job(
self,
job_name: str,
team_name: str,
started_by: str = None,
arguments: IJobArguments = None,
):
vdk_cfg = VDKConfig()
job = RemoteDataJob(
job_name, team_name, vdk_cfg.control_service_rest_api_url, arguments
job_name,
team_name,
vdk_cfg.control_service_rest_api_url,
started_by,
arguments,
)
return job.start_job_execution()
# catch error on 409

def status_job(self, job_name: str, team_name: str, execution_id: str) -> str:
vdk_cfg = VDKConfig()
job = RemoteDataJob(job_name, team_name, vdk_cfg.control_service_rest_api_url)
status = job.get_job_execution_status(execution_id)
return status.status
details = job.get_job_execution_details(execution_id)
return details.status

def details_job(self, job_name: str, team_name: str, execution_id: str) -> dict:
vdk_cfg = VDKConfig()
job = RemoteDataJob(job_name, team_name, vdk_cfg.control_service_rest_api_url)
status = job.get_job_execution_status(execution_id)
return status.to_dict()
details = job.get_job_execution_details(execution_id)
return details.to_dict()

def job_executions_list(
self, job_name: str, team_name: str
Expand Down
Loading

0 comments on commit a256ba9

Please sign in to comment.