Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move external task sensor to standard provider #44288

6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ repos:
^airflow/hooks/.*$|
^airflow/operators/.*$|
^providers/src/airflow/providers/.*$|
^airflow/sensors/.*$|
^providers/src/airflow/providers/standard/sensors/.*$|
^dev/provider_packages/.*$
- id: check-base-operator-usage
language: pygrep
Expand All @@ -714,7 +714,7 @@ repos:
^airflow/hooks/.*$|
^airflow/operators/.*$|
^providers/src/airflow/providers/.*$|
^airflow/sensors/.*$|
^providers/src/airflow/providers/standard/sensors/.*$|
^dev/provider_packages/.*$
- id: check-base-operator-usage
language: pygrep
Expand All @@ -725,7 +725,7 @@ repos:
files: >
(?x)
^providers/src/airflow/providers/.*\.py$
exclude: ^.*/.*_vendor/|providers/src/airflow/providers/standard/operators/bash.py|providers/src/airflow/providers/standard/operators/python.py
exclude: ^.*/.*_vendor/|providers/src/airflow/providers/standard/operators/bash.py|providers/src/airflow/providers/standard/operators/python.py|providers/src/airflow/providers/standard/sensors/external_task.py
- id: check-get-lineage-collector-providers
language: python
name: Check providers import hook lineage code from compat
Expand Down
8 changes: 4 additions & 4 deletions RELEASE_NOTES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6450,10 +6450,10 @@ The following table shows changes in import paths.
- ``airflow.sensors.base.BaseSensorOperator``
* - ``airflow.sensors.date_time_sensor.DateTimeSensor``
- ``airflow.sensors.date_time.DateTimeSensor``
* - ``airflow.sensors.external_task_sensor.ExternalTaskMarker``
- ``airflow.sensors.external_task.ExternalTaskMarker``
* - ``airflow.sensors.external_task_sensor.ExternalTaskSensor``
- ``airflow.sensors.external_task.ExternalTaskSensor``
* - ``airflow.providers.standard.sensors.external_task_sensor.ExternalTaskMarker``
kunaljubce marked this conversation as resolved.
Show resolved Hide resolved
- ``airflow.providers.standard.sensors.external_task.ExternalTaskMarker``
* - ``airflow.providers.standard.sensors.external_task_sensor.ExternalTaskSensor``
- ``airflow.providers.standard.sensors.external_task.ExternalTaskSensor``
* - ``airflow.sensors.sql_sensor.SqlSensor``
- ``airflow.sensors.sql.SqlSensor``
* - ``airflow.sensors.time_delta_sensor.TimeDeltaSensor``
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_external_task_marker_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor
from airflow.providers.standard.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor

start_date = pendulum.datetime(2021, 1, 1, tz="UTC")

Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ def _get_task_instances(

if include_dependent_dags:
# Recursively find external tasks indicated by ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.providers.standard.sensors.external_task import ExternalTaskMarker

query = tis
if as_pk_tuple:
Expand Down
4 changes: 2 additions & 2 deletions airflow/reproducible_build.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
release-notes-hash: c68f3fa23f84c7fc270d73baaa2cc18d
source-date-epoch: 1731415143
release-notes-hash: a04c0a95a5b972498a29924ff4b53216
source-date-epoch: 1732286028
8 changes: 4 additions & 4 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@

_OPERATOR_EXTRA_LINKS: set[str] = {
"airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunLink",
"airflow.sensors.external_task.ExternalDagLink",
"airflow.providers.standard.sensors.external_task.ExternalDagLink",
# Deprecated names, so that existing serialized dags load straight away.
"airflow.sensors.external_task.ExternalTaskSensorLink",
"airflow.providers.standard.sensors.external_task.ExternalTaskSensorLink",
"airflow.operators.dagrun_operator.TriggerDagRunLink",
"airflow.sensors.external_task_sensor.ExternalTaskSensorLink",
"airflow.providers.standard.sensors.external_task_sensor.ExternalTaskSensorLink",
}


Expand Down Expand Up @@ -1022,7 +1022,7 @@ class DependencyDetector:
def detect_task_dependencies(task: Operator) -> list[DagDependency]:
"""Detect dependencies caused by tasks."""
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor

deps = []
if isinstance(task, TriggerDagRunOperator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ DAGs.
ExternalTaskSensor
^^^^^^^^^^^^^^^^^^

Use the :class:`~airflow.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG
Use the :class:`~airflow.providers.standard.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG
wait for another task on a different DAG for a specific ``execution_date``.

ExternalTaskSensor also provide options to set if the Task on a remote DAG succeeded or failed
Expand All @@ -64,7 +64,7 @@ Also for this action you can use sensor in the deferrable mode:

ExternalTaskSensor with task_group dependency
---------------------------------------------
In Addition, we can also use the :class:`~airflow.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG
In Addition, we can also use the :class:`~airflow.providers.standard.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG
wait for another ``task_group`` on a different DAG for a specific ``execution_date``.

.. exampleinclude:: /../../airflow/example_dags/example_external_task_marker_dag.py
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/core-concepts/dags.rst
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ relationships, dependencies between DAGs are a bit more complex. In general, the
in which one DAG can depend on another:

- triggering - :class:`~airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunOperator`
- waiting - :class:`~airflow.sensors.external_task_sensor.ExternalTaskSensor`
- waiting - :class:`~airflow.providers.standard.sensors.external_task_sensor.ExternalTaskSensor`

Additional difficulty is that one DAG could wait for or trigger several runs of the other DAG
with different data intervals. The **Dag Dependencies** view
Expand Down
2 changes: 0 additions & 2 deletions docs/apache-airflow/howto/operator/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,3 @@ determine what actually executes when your DAG runs.

.. toctree::
:maxdepth: 2

external_task_sensor
5 changes: 2 additions & 3 deletions docs/apache-airflow/operators-and-hooks-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`.
* - Sensors
- Guides

* - :mod:`airflow.sensors.external_task`
- :doc:`How to use <howto/operator/external_task_sensor>`

* - :mod:`airflow.sensors.base`
-

**Hooks:**

Expand Down
4 changes: 2 additions & 2 deletions newsfragments/41391.significant.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
**Breaking Change**

The ``airflow.sensors.external_task.ExternalTaskSensorLink`` class has been removed.
The ``airflow.providers.standard.sensors.external_task.ExternalTaskSensorLink`` class has been removed.
This class was deprecated and is no longer available. Users should now use
the ``airflow.sensors.external_task.ExternalDagLink`` class directly.
the ``airflow.providers.standard.sensors.external_task.ExternalDagLink`` class directly.
1 change: 1 addition & 0 deletions providers/src/airflow/providers/standard/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ sensors:
- airflow.providers.standard.sensors.bash
- airflow.providers.standard.sensors.python
- airflow.providers.standard.sensors.filesystem
- airflow.providers.standard.sensors.external_task
hooks:
- integration-name: Standard
python-modules:
Expand Down
2 changes: 1 addition & 1 deletion tests/dags/test_external_task_sensor_check_existense.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor

from tests.models import DEFAULT_DATE

Expand Down
12 changes: 6 additions & 6 deletions tests/sensors/test_external_task_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@
from airflow.operators.empty import EmptyOperator
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.sensors.time import TimeSensor
from airflow.providers.standard.triggers.external_task import WorkflowTrigger
from airflow.sensors.external_task import (
from airflow.providers.standard.sensors.external_task import (
ExternalTaskMarker,
ExternalTaskSensor,
)
from airflow.providers.standard.sensors.time import TimeSensor
from airflow.providers.standard.triggers.external_task import WorkflowTrigger
from airflow.serialization.serialized_objects import SerializedBaseOperator
from airflow.utils.hashlib_wrapper import md5
from airflow.utils.session import NEW_SESSION, create_session, provide_session
Expand Down Expand Up @@ -934,8 +934,8 @@ def test_external_task_group_when_there_is_no_TIs(self):
),
),
)
@mock.patch("airflow.sensors.external_task.ExternalTaskSensor.get_count")
@mock.patch("airflow.sensors.external_task.ExternalTaskSensor._get_dttm_filter")
@mock.patch("airflow.providers.standard.sensors.external_task.ExternalTaskSensor.get_count")
@mock.patch("airflow.providers.standard.sensors.external_task.ExternalTaskSensor._get_dttm_filter")
def test_fail_poke(
self, _get_dttm_filter, get_count, soft_fail, expected_exception, kwargs, expected_message
):
Expand Down Expand Up @@ -991,7 +991,7 @@ def test_fail_poke(
),
),
)
@mock.patch("airflow.sensors.external_task.ExternalTaskSensor._get_dttm_filter")
@mock.patch("airflow.providers.standard.sensors.external_task.ExternalTaskSensor._get_dttm_filter")
@mock.patch("airflow.models.dagbag.DagBag.get_dag")
@mock.patch("os.path.exists")
@mock.patch("airflow.models.dag.DagModel.get_current")
Expand Down
8 changes: 4 additions & 4 deletions tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -1511,7 +1511,7 @@ def test_deps_sorted(self):
Tests serialize_operator, make sure the deps is in order
"""
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor

logical_date = datetime(2020, 1, 1)
with DAG(dag_id="test_deps_sorted", schedule=None, start_date=logical_date) as dag:
Expand Down Expand Up @@ -1626,7 +1626,7 @@ def test_derived_dag_deps_sensor(self):
Tests DAG dependency detection for sensors, including derived classes
"""
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor

class DerivedSensor(ExternalTaskSensor):
pass
Expand Down Expand Up @@ -1657,7 +1657,7 @@ def test_dag_deps_assets_with_duplicate_asset(self):
"""
Check that dag_dependencies node is populated correctly for a DAG with duplicate assets.
"""
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor

d1 = Asset("d1")
d2 = Asset("d2")
Expand Down Expand Up @@ -1746,7 +1746,7 @@ def test_dag_deps_assets(self):
"""
Check that dag_dependencies node is populated correctly for a DAG with assets.
"""
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor

d1 = Asset("d1")
d2 = Asset("d2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
from airflow.utils.timezone import datetime

with DAG(
Expand Down