Skip to content

Commit

Permalink
Replace external_trigger check with DagRunType
Browse files Browse the repository at this point in the history
  • Loading branch information
jason810496 committed Jan 23, 2025
1 parent f01c53a commit 45cf835
Show file tree
Hide file tree
Showing 61 changed files with 526 additions and 583 deletions.
1 change: 0 additions & 1 deletion airflow/api/client/local_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def trigger_dag(
"data_interval_start": dag_run.data_interval_start,
"data_interval_end": dag_run.data_interval_end,
"end_date": dag_run.end_date,
"external_trigger": dag_run.external_trigger,
"last_scheduling_decision": dag_run.last_scheduling_decision,
"logical_date": dag_run.logical_date,
"run_type": dag_run.run_type,
Expand Down
2 changes: 1 addition & 1 deletion airflow/api/common/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def find_task_relatives(tasks, downstream, upstream):
@provide_session
def get_run_ids(dag: DAG, run_id: str, future: bool, past: bool, session: SASession = NEW_SESSION):
"""Return DAG executions' run_ids."""
last_dagrun = dag.get_last_dagrun(include_externally_triggered=True, session=session)
last_dagrun = dag.get_last_dagrun(include_manually_triggered=True, session=session)
current_dagrun = dag.get_dagrun(run_id=run_id, session=session)
first_dagrun = session.scalar(
select(DagRun).filter(DagRun.dag_id == dag.dag_id).order_by(DagRun.logical_date.asc()).limit(1)
Expand Down
1 change: 0 additions & 1 deletion airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ def _trigger_dag(
conf=run_conf,
run_type=DagRunType.MANUAL,
triggered_by=triggered_by,
external_trigger=True,
dag_version=dag_version,
state=DagRunState.QUEUED,
session=session,
Expand Down
2 changes: 0 additions & 2 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ def _fetch_dag_runs(
"start_date",
"end_date",
"updated_at",
"external_trigger",
"conf",
]
query = apply_sorting(query, order_by, to_replace, allowed_sort_attrs)
Expand Down Expand Up @@ -354,7 +353,6 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
conf=post_body.get("conf"),
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
external_trigger=True,
dag_version=DagVersion.get_latest_version(dag.dag_id),
state=DagRunState.QUEUED,
session=session,
Expand Down
1 change: 0 additions & 1 deletion airflow/api_connexion/schemas/dag_run_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ class Meta:
start_date = auto_field(dump_only=True)
end_date = auto_field(dump_only=True)
state = DagStateField(dump_only=True)
external_trigger = auto_field(dump_default=True, dump_only=True)
conf = ConfObject()
data_interval_start = auto_field(validate=validate_istimezone)
data_interval_end = auto_field(validate=validate_istimezone)
Expand Down
1 change: 0 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class DAGRunResponse(BaseModel):
last_scheduling_decision: datetime | None
run_type: DagRunType
state: DagRunState
external_trigger: bool
triggered_by: DagRunTriggeredByType
conf: dict
note: str | None
Expand Down
4 changes: 0 additions & 4 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7647,9 +7647,6 @@ components:
$ref: '#/components/schemas/DagRunType'
state:
$ref: '#/components/schemas/DagRunState'
external_trigger:
type: boolean
title: External Trigger
triggered_by:
$ref: '#/components/schemas/DagRunTriggeredByType'
conf:
Expand All @@ -7673,7 +7670,6 @@ components:
- last_scheduling_decision
- run_type
- state
- external_trigger
- triggered_by
- conf
- note
Expand Down
3 changes: 0 additions & 3 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ def get_dag_runs(
"start_date",
"end_date",
"updated_at",
"external_trigger",
"conf",
],
DagRun,
Expand Down Expand Up @@ -375,7 +374,6 @@ def trigger_dag_run(
conf=body.conf,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
external_trigger=True,
dag_version=DagVersion.get_latest_version(dag.dag_id),
state=DagRunState.QUEUED,
session=session,
Expand Down Expand Up @@ -424,7 +422,6 @@ def get_list_dag_runs_batch(
"start_date",
"end_date",
"updated_at",
"external_trigger",
"conf",
],
DagRun,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ class DagRun(BaseModel):
end_date: UtcDateTime | None
run_type: DagRunType
conf: Annotated[dict[str, Any], Field(default_factory=dict)]
external_trigger: bool = False


class TIRunContext(BaseModel):
Expand Down
1 change: 0 additions & 1 deletion airflow/cli/commands/remote_commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ def _get_dag_run(
dag_id=dag.dag_id,
run_id=logical_date_or_run_id,
run_type=DagRunType.MANUAL,
external_trigger=True,
logical_date=dag_run_logical_date,
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date),
triggered_by=DagRunTriggeredByType.CLI,
Expand Down
3 changes: 1 addition & 2 deletions airflow/example_dags/plugins/event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,8 @@ def on_dag_run_failed(dag_run: DagRun, msg: str):
print("Dag run in failure state")
dag_id = dag_run.dag_id
run_id = dag_run.run_id
external_trigger = dag_run.external_trigger

print(f"Dag information:{dag_id} Run id: {run_id} external trigger: {external_trigger}")
print(f"Dag information:{dag_id} Run id: {run_id}")
print(f"Failed with message: {msg}")


Expand Down
1 change: 0 additions & 1 deletion airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ def serialize(self):
state=self.dag_run.state,
dag_id=self.dag_run.dag_id,
run_id=self.dag_run.run_id,
external_trigger=self.dag_run.external_trigger,
run_type=self.dag_run.run_type,
)
dag_run.id = self.dag_run.id
Expand Down
8 changes: 6 additions & 2 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1476,14 +1476,18 @@ def _update_state(dag: DAG, dag_run: DagRun):

dag_run.state = DagRunState.RUNNING
dag_run.start_date = timezone.utcnow()
if dag.timetable.periodic and not dag_run.external_trigger and dag_run.clear_number < 1:
if (
dag.timetable.periodic
and not dag_run.run_type == DagRunType.MANUAL
and dag_run.clear_number < 1
):
# TODO: Logically, this should be DagRunInfo.run_after, but the
# information is not stored on a DagRun, only before the actual
# execution on DagModel.next_dagrun_create_after. We should add
# a field on DagRun for this instead of relying on the run
# always happening immediately after the data interval.
# We only publish these metrics for scheduled dag runs and only
# when ``external_trigger`` is *False* and ``clear_number`` is 0.
# when ``run_type`` is *MANUAL* and ``clear_number`` is 0.
expected_start_date = dag.get_run_data_interval(dag_run).end
schedule_delay = dag_run.start_date - expected_start_date
# Publish metrics twice with backward compatible name, and then with tags
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
remove external_trigger field.
Revision ID: fbe8516980a3
Revises: e39a26ac59f6
Create Date: 2025-01-23 09:53:31.283015
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "fbe8516980a3"
down_revision = "e39a26ac59f6"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def upgrade():
"""Apply remove external_trigger field."""
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.drop_column("external_trigger")

# ### end Alembic commands ###


def downgrade():
"""Unapply remove external_trigger field."""
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.add_column(sa.Column("external_trigger", sa.BOOLEAN(), autoincrement=False, nullable=True))

# ### end Alembic commands ###
22 changes: 9 additions & 13 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def _get_model_data_interval(
return DataInterval(start, end)


def get_last_dagrun(dag_id, session, include_externally_triggered=False):
def get_last_dagrun(dag_id, session, include_manually_triggered=False):
"""
Return the last dag run for a dag, None if there was none.
Expand All @@ -190,8 +190,8 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
"""
DR = DagRun
query = select(DR).where(DR.dag_id == dag_id)
if not include_externally_triggered:
query = query.where(DR.external_trigger == expression.false())
if include_manually_triggered:
query = query.where(DR.run_type == DagRunType.MANUAL)
query = query.order_by(DR.logical_date.desc())
return session.scalar(query.limit(1))

Expand Down Expand Up @@ -252,7 +252,6 @@ def _create_orm_dagrun(
logical_date: datetime | None,
data_interval: DataInterval | None,
start_date: datetime | None,
external_trigger: bool,
conf: Any,
state: DagRunState | None,
run_type: DagRunType,
Expand All @@ -267,7 +266,6 @@ def _create_orm_dagrun(
run_id=run_id,
logical_date=logical_date,
start_date=start_date,
external_trigger=external_trigger,
conf=conf,
state=state,
run_type=run_type,
Expand Down Expand Up @@ -709,16 +707,16 @@ def iter_dagrun_infos_between(
break

@provide_session
def get_last_dagrun(self, session=NEW_SESSION, include_externally_triggered=False):
def get_last_dagrun(self, session=NEW_SESSION, include_manually_triggered=False):
return get_last_dagrun(
self.dag_id, session=session, include_externally_triggered=include_externally_triggered
self.dag_id, session=session, include_manually_triggered=include_manually_triggered
)

@provide_session
def has_dag_runs(self, session=NEW_SESSION, include_externally_triggered=True) -> bool:
def has_dag_runs(self, session=NEW_SESSION, include_manually_triggered=True) -> bool:
return (
get_last_dagrun(
self.dag_id, session=session, include_externally_triggered=include_externally_triggered
self.dag_id, session=session, include_manually_triggered=include_manually_triggered
)
is not None
)
Expand Down Expand Up @@ -1738,7 +1736,6 @@ def create_dagrun(
conf: dict | None = None,
run_type: DagRunType,
triggered_by: DagRunTriggeredByType,
external_trigger: bool = False,
dag_version: DagVersion | None = None,
state: DagRunState,
start_date: datetime | None = None,
Expand Down Expand Up @@ -1806,7 +1803,6 @@ def create_dagrun(
run_id=run_id,
logical_date=logical_date,
start_date=timezone.coerce_datetime(start_date),
external_trigger=external_trigger,
conf=conf,
state=state,
run_type=run_type,
Expand Down Expand Up @@ -2173,9 +2169,9 @@ def get_current(cls, dag_id: str, session=NEW_SESSION) -> DagModel:
return session.scalar(select(cls).where(cls.dag_id == dag_id))

@provide_session
def get_last_dagrun(self, session=NEW_SESSION, include_externally_triggered=False):
def get_last_dagrun(self, session=NEW_SESSION, include_manually_triggered=False):
return get_last_dagrun(
self.dag_id, session=session, include_externally_triggered=include_externally_triggered
self.dag_id, session=session, include_manually_triggered=include_manually_triggered
)

def get_is_paused(self, *, session: Session | None = None) -> bool:
Expand Down
Loading

0 comments on commit 45cf835

Please sign in to comment.