Skip to content

Commit

Permalink
Fix failures on main related to DagRun validation
Browse files Browse the repository at this point in the history
Follow-up of #45834
  • Loading branch information
kaxil committed Jan 22, 2025
1 parent a9bad24 commit d2243a7
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 2 deletions.
3 changes: 3 additions & 0 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2991,6 +2991,9 @@ def run(
raise_on_defer: bool = False,
) -> None:
"""Run TaskInstance."""
if self.dag_run is not None:
self.dag_run.set_state(DagRunState.RUNNING)

res = self.check_and_change_state_before_execution(
verbose=verbose,
ignore_all_deps=ignore_all_deps,
Expand Down
2 changes: 2 additions & 0 deletions providers/tests/apache/kylin/operators/test_kylin_cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,15 @@ def test_render_template(self, session):
run_id="kylin_test",
logical_date=DEFAULT_DATE,
run_type=DagRunType.MANUAL,
state="running",
)
else:
ti.dag_run = DagRun(
dag_id=self.dag.dag_id,
run_id="kylin_test",
execution_date=DEFAULT_DATE,
run_type=DagRunType.MANUAL,
state="running",
)
session.add(ti)
session.commit()
Expand Down
3 changes: 3 additions & 0 deletions providers/tests/apache/spark/operators/test_spark_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,17 @@ def test_render_template(self, session):
run_id="spark_test",
logical_date=DEFAULT_DATE,
run_type=DagRunType.MANUAL,
state="running",
)
else:
ti.dag_run = DagRun(
dag_id=self.dag.dag_id,
run_id="spark_test",
execution_date=DEFAULT_DATE,
run_type=DagRunType.MANUAL,
state="running",
)

session.add(ti)
session.commit()
# When
Expand Down
4 changes: 2 additions & 2 deletions providers/tests/openlineage/plugins/test_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def test_listener_does_not_change_task_instance(render_mock, xcom_push_mock):
run_id=run_id,
data_interval=(date, date),
run_type=types.DagRunType.MANUAL,
state=DagRunState.QUEUED,
state=DagRunState.RUNNING,
**dagrun_kwargs,
)
ti = TaskInstance(t, run_id=run_id)
Expand Down Expand Up @@ -187,7 +187,7 @@ def sample_callable(**kwargs):
run_id=run_id,
data_interval=(date, date),
run_type=types.DagRunType.MANUAL,
state=DagRunState.QUEUED,
state=DagRunState.RUNNING,
**dagrun_kwargs,
)
task_instance = TaskInstance(t, run_id=run_id)
Expand Down
1 change: 1 addition & 0 deletions providers/tests/redis/log/test_redis_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def ti(self):
else:
dag_run = DagRun(dag_id=dag.dag_id, execution_date=date, run_id="test", run_type="scheduled")

dag_run.set_state(State.RUNNING)
with create_session() as session:
session.add(dag_run)
session.commit()
Expand Down

0 comments on commit d2243a7

Please sign in to comment.