From d2243a7576c654526e09891d5421be47ec684816 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Wed, 22 Jan 2025 14:40:01 +0530 Subject: [PATCH] Fix failures on main related to DagRun validation Follow-up of https://github.com/apache/airflow/pull/45834/ --- airflow/models/taskinstance.py | 3 +++ providers/tests/apache/kylin/operators/test_kylin_cube.py | 2 ++ providers/tests/apache/spark/operators/test_spark_submit.py | 3 +++ providers/tests/openlineage/plugins/test_listener.py | 4 ++-- providers/tests/redis/log/test_redis_task_handler.py | 1 + 5 files changed, 11 insertions(+), 2 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index c0f6d20703b61..015376ec15885 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2991,6 +2991,9 @@ def run( raise_on_defer: bool = False, ) -> None: """Run TaskInstance.""" + if self.dag_run is not None: + self.dag_run.set_state(DagRunState.RUNNING) + res = self.check_and_change_state_before_execution( verbose=verbose, ignore_all_deps=ignore_all_deps, diff --git a/providers/tests/apache/kylin/operators/test_kylin_cube.py b/providers/tests/apache/kylin/operators/test_kylin_cube.py index b23833811f618..14b71c4896b1c 100644 --- a/providers/tests/apache/kylin/operators/test_kylin_cube.py +++ b/providers/tests/apache/kylin/operators/test_kylin_cube.py @@ -177,6 +177,7 @@ def test_render_template(self, session): run_id="kylin_test", logical_date=DEFAULT_DATE, run_type=DagRunType.MANUAL, + state="running", ) else: ti.dag_run = DagRun( @@ -184,6 +185,7 @@ def test_render_template(self, session): run_id="kylin_test", execution_date=DEFAULT_DATE, run_type=DagRunType.MANUAL, + state="running", ) session.add(ti) session.commit() diff --git a/providers/tests/apache/spark/operators/test_spark_submit.py b/providers/tests/apache/spark/operators/test_spark_submit.py index 879a7c999efce..2670340086bb7 100644 --- a/providers/tests/apache/spark/operators/test_spark_submit.py +++ b/providers/tests/apache/spark/operators/test_spark_submit.py @@ -201,6 +201,7 @@ def test_render_template(self, session): run_id="spark_test", logical_date=DEFAULT_DATE, run_type=DagRunType.MANUAL, + state="running", ) else: ti.dag_run = DagRun( @@ -208,7 +209,9 @@ def test_render_template(self, session): run_id="spark_test", execution_date=DEFAULT_DATE, run_type=DagRunType.MANUAL, + state="running", ) + session.add(ti) session.commit() # When diff --git a/providers/tests/openlineage/plugins/test_listener.py b/providers/tests/openlineage/plugins/test_listener.py index 837873f439d24..25d9c24e6e519 100644 --- a/providers/tests/openlineage/plugins/test_listener.py +++ b/providers/tests/openlineage/plugins/test_listener.py @@ -101,7 +101,7 @@ def test_listener_does_not_change_task_instance(render_mock, xcom_push_mock): run_id=run_id, data_interval=(date, date), run_type=types.DagRunType.MANUAL, - state=DagRunState.QUEUED, + state=DagRunState.RUNNING, **dagrun_kwargs, ) ti = TaskInstance(t, run_id=run_id) @@ -187,7 +187,7 @@ def sample_callable(**kwargs): run_id=run_id, data_interval=(date, date), run_type=types.DagRunType.MANUAL, - state=DagRunState.QUEUED, + state=DagRunState.RUNNING, **dagrun_kwargs, ) task_instance = TaskInstance(t, run_id=run_id) diff --git a/providers/tests/redis/log/test_redis_task_handler.py b/providers/tests/redis/log/test_redis_task_handler.py index 604dcb1c73147..2dde6fa402b14 100644 --- a/providers/tests/redis/log/test_redis_task_handler.py +++ b/providers/tests/redis/log/test_redis_task_handler.py @@ -46,6 +46,7 @@ def ti(self): else: dag_run = DagRun(dag_id=dag.dag_id, execution_date=date, run_id="test", run_type="scheduled") + dag_run.set_state(State.RUNNING) with create_session() as session: session.add(dag_run) session.commit()