From a3bff293a4cd22343a0fd05742cd3d1d2884bb2a Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Thu, 13 Jul 2023 06:41:59 +0200 Subject: [PATCH] Fix #12190 - Bring support for Airflow 2.6 & Use Airflow 2.6.3 as the new ingestion base (#12398) * Bump Airflow version * Support Airflow 2.6 * Lint * Bump airflow version * Fix airflow 2.6 lineage * Fix airflow 2.6 lineage --- ingestion/Dockerfile | 4 +- ingestion/Dockerfile.ci | 4 +- ingestion/setup.py | 2 +- .../source/pipeline/airflow/lineage_parser.py | 41 +++++++++++++++--- .../tests/unit/airflow/test_lineage_parser.py | 25 ++++++----- .../openmetadata_managed_apis/api/utils.py | 43 +++++++++++++++++-- .../deployment/ingestion/openmetadata.md | 4 +- 7 files changed, 95 insertions(+), 28 deletions(-) diff --git a/ingestion/Dockerfile b/ingestion/Dockerfile index cf6cf45adfad..7c949f89c04d 100644 --- a/ingestion/Dockerfile +++ b/ingestion/Dockerfile @@ -1,4 +1,4 @@ -FROM apache/airflow:2.3.3-python3.9 +FROM apache/airflow:2.6.3-python3.9 USER root RUN curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - RUN curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list @@ -75,7 +75,7 @@ USER airflow # Argument to provide for Ingestion Dependencies to install. Defaults to all ARG INGESTION_DEPENDENCY="all" RUN pip install --upgrade pip -RUN pip install "openmetadata-managed-apis==1.2.0.0.dev0" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.3.3/constraints-3.9.txt" +RUN pip install "openmetadata-managed-apis==1.2.0.0.dev0" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.9.txt" RUN pip install "openmetadata-ingestion[${INGESTION_DEPENDENCY}]==1.2.0.0.dev0" # Temporary workaround for https://github.com/open-metadata/OpenMetadata/issues/9593 diff --git a/ingestion/Dockerfile.ci b/ingestion/Dockerfile.ci index 5f6dbebf8f07..3d15d480a692 100644 --- a/ingestion/Dockerfile.ci +++ b/ingestion/Dockerfile.ci @@ -1,4 +1,4 @@ -FROM apache/airflow:2.3.3-python3.9 +FROM apache/airflow:2.6.3-python3.9 USER root RUN curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - RUN curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list @@ -74,7 +74,7 @@ COPY --chown=airflow:0 ingestion/examples/airflow/dags /opt/airflow/dags # Provide Execute Permissions to shell script RUN chmod +x /opt/airflow/ingestion_dependency.sh USER airflow -ARG AIRFLOW_CONSTRAINTS_LOCATION="https://raw.githubusercontent.com/apache/airflow/constraints-2.3.3/constraints-3.9.txt" +ARG AIRFLOW_CONSTRAINTS_LOCATION="https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.9.txt" # Argument to provide for Ingestion Dependencies to install. Defaults to all RUN pip install --upgrade pip diff --git a/ingestion/setup.py b/ingestion/setup.py index ab3482b69787..d5c91dd36865 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -28,7 +28,7 @@ def get_long_description(): # Add here versions required for multiple plugins VERSIONS = { - "airflow": "apache-airflow==2.3.3", + "airflow": "apache-airflow==2.6.3", "avro": "avro~=1.11", "boto3": "boto3>=1.20,<2.0", # No need to add botocore separately. It's a dep from boto3 "geoalchemy2": "GeoAlchemy2~=0.12", diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/lineage_parser.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/lineage_parser.py index d999dc713715..7f62bc24d3a6 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/lineage_parser.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/lineage_parser.py @@ -64,14 +64,25 @@ """ import logging import traceback +from enum import Enum from typing import Dict, List, Optional, Set from pydantic import BaseModel logger = logging.getLogger("airflow.task") -INLETS_ATTR = "_inlets" -OUTLETS_ATTR = "_outlets" + +class XLetsMode(Enum): + INLETS = "inlets" + OUTLETS = "outlets" + + +class XLetsAttr(Enum): + INLETS = "inlets" + PRIVATE_INLETS = "_inlets" + + OUTLETS = "outlets" + PRIVATE_OUTLETS = "_outlets" class XLets(BaseModel): @@ -107,7 +118,7 @@ def parse_xlets(xlet: List[dict]) -> Optional[Dict[str, List[str]]]: def get_xlets_from_operator( - operator: "BaseOperator", xlet_mode: str = INLETS_ATTR + operator: "BaseOperator", xlet_mode: XLetsMode ) -> Optional[Dict[str, List[str]]]: """ Given an Airflow DAG Task, obtain the tables @@ -120,7 +131,25 @@ def get_xlets_from_operator( :param xlet_mode: get inlet or outlet :return: list of tables FQN """ - xlet = getattr(operator, xlet_mode) if hasattr(operator, xlet_mode) else [] + attribute = None + if xlet_mode == XLetsMode.INLETS: + attribute = ( + XLetsAttr.INLETS.value + if hasattr(operator, XLetsAttr.INLETS.value) + else XLetsAttr.PRIVATE_INLETS.value + ) + + if xlet_mode == XLetsMode.OUTLETS: + attribute = ( + XLetsAttr.OUTLETS.value + if hasattr(operator, XLetsAttr.OUTLETS.value) + else XLetsAttr.PRIVATE_OUTLETS.value + ) + + if attribute is None: + raise ValueError(f"Missing attribute for {xlet_mode.value}") + + xlet = getattr(operator, attribute) or [] xlet_data = parse_xlets(xlet) if not xlet_data: @@ -146,14 +175,14 @@ def get_xlets_from_dag(dag: "DAG") -> List[XLets]: _inlets.update( get_xlets_from_operator( operator=task, - xlet_mode=INLETS_ATTR if hasattr(task, INLETS_ATTR) else "inlets", + xlet_mode=XLetsMode.INLETS, ) or [] ) _outlets.update( get_xlets_from_operator( operator=task, - xlet_mode=OUTLETS_ATTR if hasattr(task, INLETS_ATTR) else "outlets", + xlet_mode=XLetsMode.OUTLETS, ) or [] ) diff --git a/ingestion/tests/unit/airflow/test_lineage_parser.py b/ingestion/tests/unit/airflow/test_lineage_parser.py index 111497bdcfd4..9ae383241705 100644 --- a/ingestion/tests/unit/airflow/test_lineage_parser.py +++ b/ingestion/tests/unit/airflow/test_lineage_parser.py @@ -18,9 +18,8 @@ from airflow.operators.bash import BashOperator from metadata.ingestion.source.pipeline.airflow.lineage_parser import ( - INLETS_ATTR, - OUTLETS_ATTR, XLets, + XLetsMode, get_xlets_from_dag, get_xlets_from_operator, parse_xlets, @@ -64,11 +63,11 @@ def test_get_xlets_from_operator(self): outlets={"tables": ["A"]}, ) - # By default we try with inlets. There are none here - self.assertIsNone(get_xlets_from_operator(operator)) + self.assertIsNone(get_xlets_from_operator(operator, XLetsMode.INLETS)) # But the outlets are parsed correctly self.assertEqual( - get_xlets_from_operator(operator, xlet_mode=OUTLETS_ATTR), {"tables": ["A"]} + get_xlets_from_operator(operator, xlet_mode=XLetsMode.OUTLETS), + {"tables": ["A"]}, ) operator = BashOperator( @@ -78,10 +77,12 @@ def test_get_xlets_from_operator(self): ) self.assertEqual( - get_xlets_from_operator(operator, xlet_mode=INLETS_ATTR), + get_xlets_from_operator(operator, xlet_mode=XLetsMode.INLETS), {"tables": ["A"], "more_tables": ["X"]}, ) - self.assertIsNone(get_xlets_from_operator(operator, xlet_mode=OUTLETS_ATTR)) + self.assertIsNone( + get_xlets_from_operator(operator, xlet_mode=XLetsMode.OUTLETS) + ) def test_get_xlets_from_dag(self): """ @@ -89,6 +90,8 @@ def test_get_xlets_from_dag(self): all operators in the DAG """ + sleep_1 = "sleep 1" + with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag: BashOperator( task_id="print_date", @@ -98,7 +101,7 @@ def test_get_xlets_from_dag(self): BashOperator( task_id="sleep", - bash_command="sleep 1", + bash_command=sleep_1, outlets={"tables": ["B"]}, ) @@ -115,7 +118,7 @@ def test_get_xlets_from_dag(self): BashOperator( task_id="sleep", - bash_command="sleep 1", + bash_command=sleep_1, outlets={"tables": ["B"]}, ) @@ -136,7 +139,7 @@ def test_get_xlets_from_dag(self): BashOperator( task_id="sleep", - bash_command="sleep 1", + bash_command=sleep_1, outlets={ "tables": ["B"], "more_tables": ["Z"], @@ -162,7 +165,7 @@ def test_get_xlets_from_dag(self): BashOperator( task_id="sleep", - bash_command="sleep 1", + bash_command=sleep_1, outlets={ "tables": ["B"], }, diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/utils.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/utils.py index ff91d632f82a..75b61a1bdf55 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/utils.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/utils.py @@ -18,8 +18,8 @@ from typing import Optional from airflow import settings -from airflow.jobs.scheduler_job import SchedulerJob from airflow.models import DagBag +from airflow.version import version as airflow_version from flask import request from openmetadata_managed_apis.utils.logger import api_logger @@ -110,15 +110,50 @@ def get_dagbag(): class ScanDagsTask(Process): def run(self): - scheduler_job = SchedulerJob(num_times_parse_dags=1) - scheduler_job.heartrate = 0 - scheduler_job.run() + + if airflow_version >= "2.6": + scheduler_job = self._run_new_scheduler_job() + else: + scheduler_job = self._run_old_scheduler_job() try: scheduler_job.kill() except Exception as exc: logger.debug(traceback.format_exc()) logger.info(f"Rescan Complete: Killed Job: {exc}") + @staticmethod + def _run_new_scheduler_job() -> "Job": + """ + Run the new scheduler job from Airflow 2.6 + """ + from airflow.jobs.job import Job, run_job + from airflow.jobs.scheduler_job_runner import SchedulerJobRunner + + scheduler_job = Job() + job_runner = SchedulerJobRunner( + job=scheduler_job, + num_runs=1, + ) + scheduler_job.heartrate = 0 + + # pylint: disable=protected-access + run_job(scheduler_job, execute_callable=job_runner._execute) + + return scheduler_job + + @staticmethod + def _run_old_scheduler_job() -> "SchedulerJob": + """ + Run the old scheduler job before 2.6 + """ + from airflow.jobs.scheduler_job import SchedulerJob + + scheduler_job = SchedulerJob(num_times_parse_dags=1) + scheduler_job.heartrate = 0 + scheduler_job.run() + + return scheduler_job + def scan_dags_job_background(): """ diff --git a/openmetadata-docs/content/v1.1.0/deployment/ingestion/openmetadata.md b/openmetadata-docs/content/v1.1.0/deployment/ingestion/openmetadata.md index 3d2fc85a2108..555a8ef681c5 100644 --- a/openmetadata-docs/content/v1.1.0/deployment/ingestion/openmetadata.md +++ b/openmetadata-docs/content/v1.1.0/deployment/ingestion/openmetadata.md @@ -36,7 +36,6 @@ as a starting point. If you are using our `openmetadata/ingestion` Docker image, there is just one thing to do: Configure the OpenMetadata server. - The OpenMetadata server takes all its configurations from a YAML file. You can find them in our [repo](https://github.com/open-metadata/OpenMetadata/tree/main/conf). In `openmetadata.yaml`, update the `pipelineServiceClientConfiguration` section accordingly. @@ -90,7 +89,8 @@ openmetadata: ## Custom Airflow Installation {% note %} -Note that the `openmetadata-ingestion` only supports Python versions 3.7, 3.8 and 3.9. +- Note that the `openmetadata-ingestion` only supports Python versions 3.7, 3.8 and 3.9. +- The supported Airflow versions are 2.3, 2.4 and 2.5. From release 1.1.1 onwards, OpenMetadata will also support Airflow 2.6. {% /note %} You will need to follow three steps: