Skip to content

Commit

Permalink
Fix #12190 - Bring support for Airflow 2.6 & Use Airflow 2.6.3 as the…
Browse files Browse the repository at this point in the history
… new ingestion base (#12398)

* Bump Airflow version

* Support Airflow 2.6

* Lint

* Bump airflow version

* Fix airflow 2.6 lineage

* Fix airflow 2.6 lineage
  • Loading branch information
pmbrull authored Jul 13, 2023
1 parent 3114496 commit a3bff29
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 28 deletions.
4 changes: 2 additions & 2 deletions ingestion/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM apache/airflow:2.3.3-python3.9
FROM apache/airflow:2.6.3-python3.9
USER root
RUN curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
RUN curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list
Expand Down Expand Up @@ -75,7 +75,7 @@ USER airflow
# Argument to provide for Ingestion Dependencies to install. Defaults to all
ARG INGESTION_DEPENDENCY="all"
RUN pip install --upgrade pip
RUN pip install "openmetadata-managed-apis==1.2.0.0.dev0" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.3.3/constraints-3.9.txt"
RUN pip install "openmetadata-managed-apis==1.2.0.0.dev0" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.9.txt"
RUN pip install "openmetadata-ingestion[${INGESTION_DEPENDENCY}]==1.2.0.0.dev0"

# Temporary workaround for https://github.com/open-metadata/OpenMetadata/issues/9593
Expand Down
4 changes: 2 additions & 2 deletions ingestion/Dockerfile.ci
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM apache/airflow:2.3.3-python3.9
FROM apache/airflow:2.6.3-python3.9
USER root
RUN curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
RUN curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list
Expand Down Expand Up @@ -74,7 +74,7 @@ COPY --chown=airflow:0 ingestion/examples/airflow/dags /opt/airflow/dags
# Provide Execute Permissions to shell script
RUN chmod +x /opt/airflow/ingestion_dependency.sh
USER airflow
ARG AIRFLOW_CONSTRAINTS_LOCATION="https://raw.githubusercontent.com/apache/airflow/constraints-2.3.3/constraints-3.9.txt"
ARG AIRFLOW_CONSTRAINTS_LOCATION="https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.9.txt"
# Argument to provide for Ingestion Dependencies to install. Defaults to all
RUN pip install --upgrade pip

Expand Down
2 changes: 1 addition & 1 deletion ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def get_long_description():

# Add here versions required for multiple plugins
VERSIONS = {
"airflow": "apache-airflow==2.3.3",
"airflow": "apache-airflow==2.6.3",
"avro": "avro~=1.11",
"boto3": "boto3>=1.20,<2.0", # No need to add botocore separately. It's a dep from boto3
"geoalchemy2": "GeoAlchemy2~=0.12",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,25 @@
"""
import logging
import traceback
from enum import Enum
from typing import Dict, List, Optional, Set

from pydantic import BaseModel

logger = logging.getLogger("airflow.task")

INLETS_ATTR = "_inlets"
OUTLETS_ATTR = "_outlets"

class XLetsMode(Enum):
INLETS = "inlets"
OUTLETS = "outlets"


class XLetsAttr(Enum):
INLETS = "inlets"
PRIVATE_INLETS = "_inlets"

OUTLETS = "outlets"
PRIVATE_OUTLETS = "_outlets"


class XLets(BaseModel):
Expand Down Expand Up @@ -107,7 +118,7 @@ def parse_xlets(xlet: List[dict]) -> Optional[Dict[str, List[str]]]:


def get_xlets_from_operator(
operator: "BaseOperator", xlet_mode: str = INLETS_ATTR
operator: "BaseOperator", xlet_mode: XLetsMode
) -> Optional[Dict[str, List[str]]]:
"""
Given an Airflow DAG Task, obtain the tables
Expand All @@ -120,7 +131,25 @@ def get_xlets_from_operator(
:param xlet_mode: get inlet or outlet
:return: list of tables FQN
"""
xlet = getattr(operator, xlet_mode) if hasattr(operator, xlet_mode) else []
attribute = None
if xlet_mode == XLetsMode.INLETS:
attribute = (
XLetsAttr.INLETS.value
if hasattr(operator, XLetsAttr.INLETS.value)
else XLetsAttr.PRIVATE_INLETS.value
)

if xlet_mode == XLetsMode.OUTLETS:
attribute = (
XLetsAttr.OUTLETS.value
if hasattr(operator, XLetsAttr.OUTLETS.value)
else XLetsAttr.PRIVATE_OUTLETS.value
)

if attribute is None:
raise ValueError(f"Missing attribute for {xlet_mode.value}")

xlet = getattr(operator, attribute) or []
xlet_data = parse_xlets(xlet)

if not xlet_data:
Expand All @@ -146,14 +175,14 @@ def get_xlets_from_dag(dag: "DAG") -> List[XLets]:
_inlets.update(
get_xlets_from_operator(
operator=task,
xlet_mode=INLETS_ATTR if hasattr(task, INLETS_ATTR) else "inlets",
xlet_mode=XLetsMode.INLETS,
)
or []
)
_outlets.update(
get_xlets_from_operator(
operator=task,
xlet_mode=OUTLETS_ATTR if hasattr(task, INLETS_ATTR) else "outlets",
xlet_mode=XLetsMode.OUTLETS,
)
or []
)
Expand Down
25 changes: 14 additions & 11 deletions ingestion/tests/unit/airflow/test_lineage_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
from airflow.operators.bash import BashOperator

from metadata.ingestion.source.pipeline.airflow.lineage_parser import (
INLETS_ATTR,
OUTLETS_ATTR,
XLets,
XLetsMode,
get_xlets_from_dag,
get_xlets_from_operator,
parse_xlets,
Expand Down Expand Up @@ -64,11 +63,11 @@ def test_get_xlets_from_operator(self):
outlets={"tables": ["A"]},
)

# By default we try with inlets. There are none here
self.assertIsNone(get_xlets_from_operator(operator))
self.assertIsNone(get_xlets_from_operator(operator, XLetsMode.INLETS))
# But the outlets are parsed correctly
self.assertEqual(
get_xlets_from_operator(operator, xlet_mode=OUTLETS_ATTR), {"tables": ["A"]}
get_xlets_from_operator(operator, xlet_mode=XLetsMode.OUTLETS),
{"tables": ["A"]},
)

operator = BashOperator(
Expand All @@ -78,17 +77,21 @@ def test_get_xlets_from_operator(self):
)

self.assertEqual(
get_xlets_from_operator(operator, xlet_mode=INLETS_ATTR),
get_xlets_from_operator(operator, xlet_mode=XLetsMode.INLETS),
{"tables": ["A"], "more_tables": ["X"]},
)
self.assertIsNone(get_xlets_from_operator(operator, xlet_mode=OUTLETS_ATTR))
self.assertIsNone(
get_xlets_from_operator(operator, xlet_mode=XLetsMode.OUTLETS)
)

def test_get_xlets_from_dag(self):
"""
Check that we can properly join the xlet information from
all operators in the DAG
"""

sleep_1 = "sleep 1"

with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag:
BashOperator(
task_id="print_date",
Expand All @@ -98,7 +101,7 @@ def test_get_xlets_from_dag(self):

BashOperator(
task_id="sleep",
bash_command="sleep 1",
bash_command=sleep_1,
outlets={"tables": ["B"]},
)

Expand All @@ -115,7 +118,7 @@ def test_get_xlets_from_dag(self):

BashOperator(
task_id="sleep",
bash_command="sleep 1",
bash_command=sleep_1,
outlets={"tables": ["B"]},
)

Expand All @@ -136,7 +139,7 @@ def test_get_xlets_from_dag(self):

BashOperator(
task_id="sleep",
bash_command="sleep 1",
bash_command=sleep_1,
outlets={
"tables": ["B"],
"more_tables": ["Z"],
Expand All @@ -162,7 +165,7 @@ def test_get_xlets_from_dag(self):

BashOperator(
task_id="sleep",
bash_command="sleep 1",
bash_command=sleep_1,
outlets={
"tables": ["B"],
},
Expand Down
43 changes: 39 additions & 4 deletions openmetadata-airflow-apis/openmetadata_managed_apis/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
from typing import Optional

from airflow import settings
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.models import DagBag
from airflow.version import version as airflow_version
from flask import request
from openmetadata_managed_apis.utils.logger import api_logger

Expand Down Expand Up @@ -110,15 +110,50 @@ def get_dagbag():

class ScanDagsTask(Process):
def run(self):
scheduler_job = SchedulerJob(num_times_parse_dags=1)
scheduler_job.heartrate = 0
scheduler_job.run()

if airflow_version >= "2.6":
scheduler_job = self._run_new_scheduler_job()
else:
scheduler_job = self._run_old_scheduler_job()
try:
scheduler_job.kill()
except Exception as exc:
logger.debug(traceback.format_exc())
logger.info(f"Rescan Complete: Killed Job: {exc}")

@staticmethod
def _run_new_scheduler_job() -> "Job":
"""
Run the new scheduler job from Airflow 2.6
"""
from airflow.jobs.job import Job, run_job
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner

scheduler_job = Job()
job_runner = SchedulerJobRunner(
job=scheduler_job,
num_runs=1,
)
scheduler_job.heartrate = 0

# pylint: disable=protected-access
run_job(scheduler_job, execute_callable=job_runner._execute)

return scheduler_job

@staticmethod
def _run_old_scheduler_job() -> "SchedulerJob":
"""
Run the old scheduler job before 2.6
"""
from airflow.jobs.scheduler_job import SchedulerJob

scheduler_job = SchedulerJob(num_times_parse_dags=1)
scheduler_job.heartrate = 0
scheduler_job.run()

return scheduler_job


def scan_dags_job_background():
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ as a starting point.

If you are using our `openmetadata/ingestion` Docker image, there is just one thing to do: Configure the OpenMetadata server.


The OpenMetadata server takes all its configurations from a YAML file. You can find them in our [repo](https://github.com/open-metadata/OpenMetadata/tree/main/conf). In
`openmetadata.yaml`, update the `pipelineServiceClientConfiguration` section accordingly.

Expand Down Expand Up @@ -90,7 +89,8 @@ openmetadata:
## Custom Airflow Installation
{% note %}
Note that the `openmetadata-ingestion` only supports Python versions 3.7, 3.8 and 3.9.
- Note that the `openmetadata-ingestion` only supports Python versions 3.7, 3.8 and 3.9.
- The supported Airflow versions are 2.3, 2.4 and 2.5. From release 1.1.1 onwards, OpenMetadata will also support Airflow 2.6.
{% /note %}

You will need to follow three steps:
Expand Down

0 comments on commit a3bff29

Please sign in to comment.