diff --git a/airflow/api/client/local_client.py b/airflow/api/client/local_client.py index 800afa2830217..db93f5a52098b 100644 --- a/airflow/api/client/local_client.py +++ b/airflow/api/client/local_client.py @@ -54,7 +54,6 @@ def trigger_dag( "data_interval_start": dag_run.data_interval_start, "data_interval_end": dag_run.data_interval_end, "end_date": dag_run.end_date, - "external_trigger": dag_run.external_trigger, "last_scheduling_decision": dag_run.last_scheduling_decision, "logical_date": dag_run.logical_date, "run_type": dag_run.run_type, diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py index f249199651849..4391202cbc656 100644 --- a/airflow/api/common/mark_tasks.py +++ b/airflow/api/common/mark_tasks.py @@ -141,7 +141,7 @@ def find_task_relatives(tasks, downstream, upstream): @provide_session def get_run_ids(dag: DAG, run_id: str, future: bool, past: bool, session: SASession = NEW_SESSION): """Return DAG executions' run_ids.""" - last_dagrun = dag.get_last_dagrun(include_externally_triggered=True, session=session) + last_dagrun = dag.get_last_dagrun(include_manually_triggered=True, session=session) current_dagrun = dag.get_dagrun(run_id=run_id, session=session) first_dagrun = session.scalar( select(DagRun).filter(DagRun.dag_id == dag.dag_id).order_by(DagRun.logical_date.asc()).limit(1) diff --git a/airflow/api/common/trigger_dag.py b/airflow/api/common/trigger_dag.py index d04a790c02f88..8e31a6bc5ba1d 100644 --- a/airflow/api/common/trigger_dag.py +++ b/airflow/api/common/trigger_dag.py @@ -105,7 +105,6 @@ def _trigger_dag( conf=run_conf, run_type=DagRunType.MANUAL, triggered_by=triggered_by, - external_trigger=True, dag_version=dag_version, state=DagRunState.QUEUED, session=session, diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index 0fd7f537a5e40..5559a59ca3b7c 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -186,7 +186,6 @@ def _fetch_dag_runs( "start_date", "end_date", "updated_at", - "external_trigger", "conf", ] query = apply_sorting(query, order_by, to_replace, allowed_sort_attrs) @@ -354,7 +353,6 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse: conf=post_body.get("conf"), run_type=DagRunType.MANUAL, triggered_by=DagRunTriggeredByType.REST_API, - external_trigger=True, dag_version=DagVersion.get_latest_version(dag.dag_id), state=DagRunState.QUEUED, session=session, diff --git a/airflow/api_connexion/schemas/dag_run_schema.py b/airflow/api_connexion/schemas/dag_run_schema.py index c2560613def70..c1d4c897f4900 100644 --- a/airflow/api_connexion/schemas/dag_run_schema.py +++ b/airflow/api_connexion/schemas/dag_run_schema.py @@ -67,7 +67,6 @@ class Meta: start_date = auto_field(dump_only=True) end_date = auto_field(dump_only=True) state = DagStateField(dump_only=True) - external_trigger = auto_field(dump_default=True, dump_only=True) conf = ConfObject() data_interval_start = auto_field(validate=validate_istimezone) data_interval_end = auto_field(validate=validate_istimezone) diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow/api_fastapi/core_api/datamodels/dag_run.py index 48c92d2a83cb0..56c095353a56e 100644 --- a/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -65,7 +65,6 @@ class DAGRunResponse(BaseModel): last_scheduling_decision: datetime | None run_type: DagRunType state: DagRunState - external_trigger: bool triggered_by: DagRunTriggeredByType conf: dict note: str | None diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 6075d0c886ef2..a019330cdf7d7 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -7656,9 +7656,6 @@ components: $ref: '#/components/schemas/DagRunType' state: $ref: '#/components/schemas/DagRunState' - external_trigger: - type: boolean - title: External Trigger triggered_by: $ref: '#/components/schemas/DagRunTriggeredByType' conf: @@ -7682,7 +7679,6 @@ components: - last_scheduling_decision - run_type - state - - external_trigger - triggered_by - conf - note diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index cb846caaa2ec5..aef95eacf8f3f 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -282,7 +282,6 @@ def get_dag_runs( "start_date", "end_date", "updated_at", - "external_trigger", "conf", ], DagRun, @@ -375,7 +374,6 @@ def trigger_dag_run( conf=body.conf, run_type=DagRunType.MANUAL, triggered_by=DagRunTriggeredByType.REST_API, - external_trigger=True, dag_version=DagVersion.get_latest_version(dag.dag_id), state=DagRunState.QUEUED, session=session, @@ -424,7 +422,6 @@ def get_list_dag_runs_batch( "start_date", "end_date", "updated_at", - "external_trigger", "conf", ], DagRun, diff --git a/airflow/api_fastapi/execution_api/datamodels/taskinstance.py b/airflow/api_fastapi/execution_api/datamodels/taskinstance.py index 6cc82259cf758..518c2bdb3f804 100644 --- a/airflow/api_fastapi/execution_api/datamodels/taskinstance.py +++ b/airflow/api_fastapi/execution_api/datamodels/taskinstance.py @@ -223,7 +223,6 @@ class DagRun(BaseModel): end_date: UtcDateTime | None run_type: DagRunType conf: Annotated[dict[str, Any], Field(default_factory=dict)] - external_trigger: bool = False class TIRunContext(BaseModel): diff --git a/airflow/cli/commands/remote_commands/task_command.py b/airflow/cli/commands/remote_commands/task_command.py index b1f9182e4c9c3..681b478bfc1a6 100644 --- a/airflow/cli/commands/remote_commands/task_command.py +++ b/airflow/cli/commands/remote_commands/task_command.py @@ -174,7 +174,6 @@ def _get_dag_run( dag_id=dag.dag_id, run_id=logical_date_or_run_id, run_type=DagRunType.MANUAL, - external_trigger=True, logical_date=dag_run_logical_date, data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date), triggered_by=DagRunTriggeredByType.CLI, diff --git a/airflow/example_dags/plugins/event_listener.py b/airflow/example_dags/plugins/event_listener.py index 6d9fe2ff11735..35a9725217abc 100644 --- a/airflow/example_dags/plugins/event_listener.py +++ b/airflow/example_dags/plugins/event_listener.py @@ -147,9 +147,8 @@ def on_dag_run_failed(dag_run: DagRun, msg: str): print("Dag run in failure state") dag_id = dag_run.dag_id run_id = dag_run.run_id - external_trigger = dag_run.external_trigger - print(f"Dag information:{dag_id} Run id: {run_id} external trigger: {external_trigger}") + print(f"Dag information:{dag_id} Run id: {run_id}") print(f"Failed with message: {msg}") diff --git a/airflow/exceptions.py b/airflow/exceptions.py index 89358e76bd092..16403bf793eb0 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -270,7 +270,6 @@ def serialize(self): state=self.dag_run.state, dag_id=self.dag_run.dag_id, run_id=self.dag_run.run_id, - external_trigger=self.dag_run.external_trigger, run_type=self.dag_run.run_type, ) dag_run.id = self.dag_run.id diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index e6bd1636b3401..d5e56bcd7b753 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1476,14 +1476,18 @@ def _update_state(dag: DAG, dag_run: DagRun): dag_run.state = DagRunState.RUNNING dag_run.start_date = timezone.utcnow() - if dag.timetable.periodic and not dag_run.external_trigger and dag_run.clear_number < 1: + if ( + dag.timetable.periodic + and not dag_run.run_type == DagRunType.MANUAL + and dag_run.clear_number < 1 + ): # TODO: Logically, this should be DagRunInfo.run_after, but the # information is not stored on a DagRun, only before the actual # execution on DagModel.next_dagrun_create_after. We should add # a field on DagRun for this instead of relying on the run # always happening immediately after the data interval. # We only publish these metrics for scheduled dag runs and only - # when ``external_trigger`` is *False* and ``clear_number`` is 0. + # when ``run_type`` is *MANUAL* and ``clear_number`` is 0. expected_start_date = dag.get_run_data_interval(dag_run).end schedule_delay = dag_run.start_date - expected_start_date # Publish metrics twice with backward compatible name, and then with tags diff --git a/airflow/migrations/versions/0056_3_0_0_remove_external_trigger_field.py b/airflow/migrations/versions/0056_3_0_0_remove_external_trigger_field.py new file mode 100644 index 0000000000000..c0792a148b436 --- /dev/null +++ b/airflow/migrations/versions/0056_3_0_0_remove_external_trigger_field.py @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +remove external_trigger field. + +Revision ID: fbe8516980a3 +Revises: e39a26ac59f6 +Create Date: 2025-01-23 09:53:31.283015 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "fbe8516980a3" +down_revision = "e39a26ac59f6" +branch_labels = None +depends_on = None +airflow_version = "3.0.0" + + +def upgrade(): + """Apply remove external_trigger field.""" + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("dag_run", schema=None) as batch_op: + batch_op.drop_column("external_trigger") + + # ### end Alembic commands ### + + +def downgrade(): + """Unapply remove external_trigger field.""" + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("dag_run", schema=None) as batch_op: + batch_op.add_column(sa.Column("external_trigger", sa.BOOLEAN(), autoincrement=False, nullable=True)) + + # ### end Alembic commands ### diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 9b88e6d70f71c..c391944d55361 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -181,7 +181,7 @@ def _get_model_data_interval( return DataInterval(start, end) -def get_last_dagrun(dag_id, session, include_externally_triggered=False): +def get_last_dagrun(dag_id, session, include_manually_triggered=False): """ Return the last dag run for a dag, None if there was none. @@ -190,8 +190,8 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False): """ DR = DagRun query = select(DR).where(DR.dag_id == dag_id) - if not include_externally_triggered: - query = query.where(DR.external_trigger == expression.false()) + if not include_manually_triggered: + query = query.where(DR.run_type != DagRunType.MANUAL) query = query.order_by(DR.logical_date.desc()) return session.scalar(query.limit(1)) @@ -252,7 +252,6 @@ def _create_orm_dagrun( logical_date: datetime | None, data_interval: DataInterval | None, start_date: datetime | None, - external_trigger: bool, conf: Any, state: DagRunState | None, run_type: DagRunType, @@ -267,7 +266,6 @@ def _create_orm_dagrun( run_id=run_id, logical_date=logical_date, start_date=start_date, - external_trigger=external_trigger, conf=conf, state=state, run_type=run_type, @@ -709,16 +707,16 @@ def iter_dagrun_infos_between( break @provide_session - def get_last_dagrun(self, session=NEW_SESSION, include_externally_triggered=False): + def get_last_dagrun(self, session=NEW_SESSION, include_manually_triggered=False): return get_last_dagrun( - self.dag_id, session=session, include_externally_triggered=include_externally_triggered + self.dag_id, session=session, include_manually_triggered=include_manually_triggered ) @provide_session - def has_dag_runs(self, session=NEW_SESSION, include_externally_triggered=True) -> bool: + def has_dag_runs(self, session=NEW_SESSION, include_manually_triggered=True) -> bool: return ( get_last_dagrun( - self.dag_id, session=session, include_externally_triggered=include_externally_triggered + self.dag_id, session=session, include_manually_triggered=include_manually_triggered ) is not None ) @@ -1738,7 +1736,6 @@ def create_dagrun( conf: dict | None = None, run_type: DagRunType, triggered_by: DagRunTriggeredByType, - external_trigger: bool = False, dag_version: DagVersion | None = None, state: DagRunState, start_date: datetime | None = None, @@ -1806,7 +1803,6 @@ def create_dagrun( run_id=run_id, logical_date=logical_date, start_date=timezone.coerce_datetime(start_date), - external_trigger=external_trigger, conf=conf, state=state, run_type=run_type, @@ -2173,9 +2169,9 @@ def get_current(cls, dag_id: str, session=NEW_SESSION) -> DagModel: return session.scalar(select(cls).where(cls.dag_id == dag_id)) @provide_session - def get_last_dagrun(self, session=NEW_SESSION, include_externally_triggered=False): + def get_last_dagrun(self, session=NEW_SESSION, include_manually_triggered=False): return get_last_dagrun( - self.dag_id, session=session, include_externally_triggered=include_externally_triggered + self.dag_id, session=session, include_manually_triggered=include_manually_triggered ) def get_is_paused(self, *, session: Session | None = None) -> bool: diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 35d8af4322c49..71bfbaf1d59eb 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -26,7 +26,6 @@ import re2 from sqlalchemy import ( JSON, - Boolean, Column, Enum, ForeignKey, @@ -136,7 +135,6 @@ class DagRun(Base, LoggingMixin): _state = Column("state", String(50), default=DagRunState.QUEUED) run_id = Column(StringID(), nullable=False) creating_job_id = Column(Integer) - external_trigger = Column(Boolean, default=True) run_type = Column(String(50), nullable=False) triggered_by = Column( Enum(DagRunTriggeredByType, native_enum=False, length=50) @@ -232,7 +230,6 @@ def __init__( queued_at: datetime | None | ArgNotSet = NOTSET, logical_date: datetime | None = None, start_date: datetime | None = None, - external_trigger: bool | None = None, conf: Any | None = None, state: DagRunState | None = None, run_type: str | None = None, @@ -254,7 +251,6 @@ def __init__( self.run_id = run_id self.logical_date = logical_date self.start_date = start_date - self.external_trigger = external_trigger self.conf = conf or {} if state is not None: self.state = state @@ -273,7 +269,7 @@ def __init__( def __repr__(self): return ( f"" + f"queued_at: {self.queued_at}. run_type: {self.run_type}>" ) @validates("run_id") @@ -543,7 +539,6 @@ def find( run_id: Iterable[str] | None = None, logical_date: datetime | Iterable[datetime] | None = None, state: DagRunState | None = None, - external_trigger: bool | None = None, no_backfills: bool = False, run_type: DagRunType | None = None, session: Session = NEW_SESSION, @@ -558,7 +553,6 @@ def find( :param run_type: type of DagRun :param logical_date: the logical date :param state: the state of the dag run - :param external_trigger: whether this dag run is externally triggered :param no_backfills: return no backfills (True), return all (False). Defaults to False :param session: database session @@ -586,8 +580,6 @@ def find( qry = qry.where(cls.logical_date <= logical_end_date) if state: qry = qry.where(cls.state == state) - if external_trigger is not None: - qry = qry.where(cls.external_trigger == external_trigger) if run_type: qry = qry.where(cls.run_type == run_type) if no_backfills: @@ -970,7 +962,7 @@ def recalculate(self) -> _UnfinishedStates: msg = ( "DagRun Finished: dag_id=%s, logical_date=%s, run_id=%s, " "run_start_date=%s, run_end_date=%s, run_duration=%s, " - "state=%s, external_trigger=%s, run_type=%s, " + "state=%s, run_type=%s, " "data_interval_start=%s, data_interval_end=%s, dag_version_name=%s" ) dagv = session.scalar(select(DagVersion).where(DagVersion.id == self.dag_version_id)) @@ -987,7 +979,6 @@ def recalculate(self) -> _UnfinishedStates: else None ), self._state, - self.external_trigger, self.run_type, self.data_interval_start, self.data_interval_end, @@ -1022,7 +1013,6 @@ def _trace_dagrun(self, dagv) -> None: if self.start_date and self.end_date else 0, "state": str(self._state), - "external_trigger": self.external_trigger, "run_type": str(self.run_type), "data_interval_start": str(self.data_interval_start), "data_interval_end": str(self.data_interval_end), @@ -1227,11 +1217,11 @@ def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis: lis rid of the outliers on the stats side through dashboards tooling. Note that the stat will only be emitted for scheduler-triggered DAG runs - (i.e. when ``external_trigger`` is *False* and ``clear_number`` is equal to 0). + (i.e. when ``run_type`` is *MANUAL* and ``clear_number`` is equal to 0). """ if self.state == TaskInstanceState.RUNNING: return - if self.external_trigger: + if self.run_type == DagRunType.MANUAL: return if self.clear_number > 0: return diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index f1aa3a8236e9c..1d88e302be958 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -842,7 +842,6 @@ def _set_ti_attrs(target, source, include_dag_run=False): target.dag_run.state = source.dag_run.state target.dag_run.run_id = source.dag_run.run_id target.dag_run.creating_job_id = source.dag_run.creating_job_id - target.dag_run.external_trigger = source.dag_run.external_trigger target.dag_run.run_type = source.dag_run.run_type target.dag_run.conf = source.dag_run.conf target.dag_run.data_interval_start = source.dag_run.data_interval_start diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 1873dbbd9e206..3137f52b93d86 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -2028,10 +2028,6 @@ export const $DAGRunResponse = { state: { $ref: "#/components/schemas/DagRunState", }, - external_trigger: { - type: "boolean", - title: "External Trigger", - }, triggered_by: { $ref: "#/components/schemas/DagRunTriggeredByType", }, @@ -2064,7 +2060,6 @@ export const $DAGRunResponse = { "last_scheduling_decision", "run_type", "state", - "external_trigger", "triggered_by", "conf", "note", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 1b597efdb07aa..0c27d970c0414 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -512,7 +512,6 @@ export type DAGRunResponse = { last_scheduling_decision: string | null; run_type: DagRunType; state: DagRunState; - external_trigger: boolean; triggered_by: DagRunTriggeredByType; conf: { [key: string]: unknown; diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 1a1eb6f4d3500..38196dcc74484 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -94,7 +94,7 @@ class MappedClassProtocol(Protocol): "2.9.2": "686269002441", "2.10.0": "22ed7efa9da2", "2.10.3": "5f2621c13b39", - "3.0.0": "e39a26ac59f6", + "3.0.0": "fbe8516980a3", } diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py index 8c7b95a6e8940..d5bd93c0ae2a1 100644 --- a/airflow/utils/db_cleanup.py +++ b/airflow/utils/db_cleanup.py @@ -30,7 +30,7 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, Any -from sqlalchemy import and_, column, false, func, inspect, select, table, text +from sqlalchemy import and_, column, func, inspect, select, table, text from sqlalchemy.exc import OperationalError, ProgrammingError from sqlalchemy.ext.compiler import compiles from sqlalchemy.orm import aliased @@ -107,9 +107,9 @@ def readable_config(self): _TableConfig( table_name="dag_run", recency_column_name="start_date", - extra_columns=["dag_id", "external_trigger"], + extra_columns=["dag_id", "run_type"], keep_last=True, - keep_last_filters=[column("external_trigger") == false()], + keep_last_filters=[column("run_type") != "manual"], keep_last_group_by=["dag_id"], ), _TableConfig(table_name="asset_event", recency_column_name="timestamp"), diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 7860e3cf7282d..e64d53f51fe40 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -179,7 +179,6 @@ def encode_dag_run( "data_interval_end": datetime_to_string(dag_run.data_interval_end), "run_type": dag_run.run_type, "last_scheduling_decision": datetime_to_string(dag_run.last_scheduling_decision), - "external_trigger": dag_run.external_trigger, "conf": dag_run_conf, "conf_is_json": conf_is_json, "note": dag_run.note, diff --git a/airflow/www/views.py b/airflow/www/views.py index dd5bc856f74a3..2bc1c9ab4ab84 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -2238,7 +2238,6 @@ def trigger(self, dag_id: str, session: Session = NEW_SESSION): conf=run_conf, run_type=DagRunType.MANUAL, triggered_by=DagRunTriggeredByType.UI, - external_trigger=True, dag_version=DagVersion.get_latest_version(dag.dag_id), state=DagRunState.QUEUED, session=session, @@ -4779,7 +4778,6 @@ class DagRunModelView(AirflowModelView): "start_date", "end_date", "note", - "external_trigger", "conf", "duration", ] @@ -4792,7 +4790,6 @@ class DagRunModelView(AirflowModelView): "start_date", "end_date", "note", - "external_trigger", ] label_columns = { "logical_date": "Logical Date", @@ -4818,7 +4815,6 @@ class DagRunModelView(AirflowModelView): "start_date", "end_date", # "note", # todo: maybe figure out how to re-enable this - "external_trigger", "conf", ] diff --git a/dev/perf/scheduler_dag_execution_timing.py b/dev/perf/scheduler_dag_execution_timing.py index 558b4eba4980c..75df2c7f7f353 100755 --- a/dev/perf/scheduler_dag_execution_timing.py +++ b/dev/perf/scheduler_dag_execution_timing.py @@ -175,7 +175,6 @@ def create_dag_runs(dag, num_runs, session): data_interval=(logical_date, logical_date), run_type=DagRunType.MANUAL, triggered_by=DagRunTriggeredByType.TEST, - external_trigger=False, dag_version=None, state=DagRunState.RUNNING, start_date=timezone.utcnow(), diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 5626cf9708b0b..d655804175577 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -cb858681fdc7a596db20c1c5dbf93812fd011a6df1e0b5322a49a51c8476bb93 \ No newline at end of file +3d1e3f483de8d0bf9f9d4159c7781a979337b05fc55a18e2891a7b50a326a080 \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 3fa6369924caa..43080c8ecdca9 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -80,121 +80,121 @@ [INTEGER] - - -slot_pool - -slot_pool - -id - - [INTEGER] - NOT NULL - -description - - [TEXT] - -include_deferred - - [BOOLEAN] - NOT NULL - -pool - - [VARCHAR(256)] - -slots - - [INTEGER] - - + callback_request - -callback_request - -id - - [INTEGER] - NOT NULL - -callback_data - - [JSON] - NOT NULL - -callback_type - - [VARCHAR(20)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -priority_weight - - [INTEGER] - NOT NULL + +callback_request + +id + + [INTEGER] + NOT NULL + +callback_data + + [JSON] + NOT NULL + +callback_type + + [VARCHAR(20)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +priority_weight + + [INTEGER] + NOT NULL - + connection - -connection - -id - - [INTEGER] - NOT NULL - -conn_id - - [VARCHAR(250)] - NOT NULL - -conn_type - - [VARCHAR(500)] - NOT NULL - -description - - [TEXT] - -extra - - [TEXT] - -host - - [VARCHAR(500)] - -is_encrypted - - [BOOLEAN] - -is_extra_encrypted - - [BOOLEAN] - -login - - [TEXT] - -password - - [TEXT] - -port - - [INTEGER] - -schema - - [VARCHAR(500)] + +connection + +id + + [INTEGER] + NOT NULL + +conn_id + + [VARCHAR(250)] + NOT NULL + +conn_type + + [VARCHAR(500)] + NOT NULL + +description + + [TEXT] + +extra + + [TEXT] + +host + + [VARCHAR(500)] + +is_encrypted + + [BOOLEAN] + +is_extra_encrypted + + [BOOLEAN] + +login + + [TEXT] + +password + + [TEXT] + +port + + [INTEGER] + +schema + + [VARCHAR(500)] + + + +slot_pool + +slot_pool + +id + + [INTEGER] + NOT NULL + +description + + [TEXT] + +include_deferred + + [BOOLEAN] + NOT NULL + +pool + + [VARCHAR(256)] + +slots + + [INTEGER] @@ -1047,30 +1047,30 @@ task_instance--task_map - + 0..N -1 +1 task_instance--task_map - + 0..N -1 +1 task_instance--task_map - + 0..N -1 +1 task_instance--task_map - + 0..N -1 +1 @@ -1120,30 +1120,30 @@ task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 @@ -1731,41 +1731,41 @@ deadline - -deadline - -id - - [UUID] - NOT NULL - -callback - - [VARCHAR(500)] - NOT NULL - -callback_kwargs - - [JSON] - -dag_id - - [VARCHAR(250)] - -dagrun_id - - [INTEGER] - -deadline - - [TIMESTAMP] - NOT NULL + +deadline + +id + + [UUID] + NOT NULL + +callback + + [VARCHAR(500)] + NOT NULL + +callback_kwargs + + [JSON] + +dag_id + + [VARCHAR(250)] + +dagrun_id + + [INTEGER] + +deadline + + [TIMESTAMP] + NOT NULL dag--deadline - -0..N + +0..N {0,1} @@ -1778,108 +1778,104 @@ dag_run - -dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - -bundle_version - - [VARCHAR(250)] - -clear_number - - [INTEGER] - NOT NULL - -conf - - [JSONB] - -creating_job_id - - [INTEGER] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -data_interval_end - - [TIMESTAMP] - -data_interval_start - - [TIMESTAMP] - -end_date - - [TIMESTAMP] - -external_trigger - - [BOOLEAN] - -last_scheduling_decision - - [TIMESTAMP] - -log_template_id - - [INTEGER] - -logical_date - - [TIMESTAMP] - NOT NULL - -queued_at - - [TIMESTAMP] - -run_id - - [VARCHAR(250)] - NOT NULL - -run_type - - [VARCHAR(50)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(50)] - -triggered_by - - [VARCHAR(50)] - -updated_at - - [TIMESTAMP] + +dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + +bundle_version + + [VARCHAR(250)] + +clear_number + + [INTEGER] + NOT NULL + +conf + + [JSONB] + +creating_job_id + + [INTEGER] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +data_interval_end + + [TIMESTAMP] + +data_interval_start + + [TIMESTAMP] + +end_date + + [TIMESTAMP] + +last_scheduling_decision + + [TIMESTAMP] + +log_template_id + + [INTEGER] + +logical_date + + [TIMESTAMP] + NOT NULL + +queued_at + + [TIMESTAMP] + +run_id + + [VARCHAR(250)] + NOT NULL + +run_type + + [VARCHAR(50)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(50)] + +triggered_by + + [VARCHAR(50)] + +updated_at + + [TIMESTAMP] dag_version--dag_run - -0..N + +0..N {0,1} @@ -1979,121 +1975,121 @@ dag_run--dagrun_asset_event - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 dag_run--deadline - -0..N -{0,1} + +0..N +{0,1} backfill_dag_run - -backfill_dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - NOT NULL - -dag_run_id - - [INTEGER] - -exception_reason - - [VARCHAR(250)] - -logical_date - - [TIMESTAMP] - NOT NULL - -sort_ordinal - - [INTEGER] - NOT NULL + +backfill_dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + NOT NULL + +dag_run_id + + [INTEGER] + +exception_reason + + [VARCHAR(250)] + +logical_date + + [TIMESTAMP] + NOT NULL + +sort_ordinal + + [INTEGER] + NOT NULL dag_run--backfill_dag_run - -0..N -{0,1} + +0..N +{0,1} dag_run_note - -dag_run_note - -dag_run_id - - [INTEGER] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] dag_run--dag_run_note - -1 -1 + +1 +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 @@ -2124,9 +2120,9 @@ log_template--dag_run - -0..N -{0,1} + +0..N +{0,1} @@ -2190,16 +2186,16 @@ backfill--dag_run - -0..N -{0,1} + +0..N +{0,1} backfill--backfill_dag_run - -0..N -1 + +0..N +1 @@ -2319,28 +2315,28 @@ ab_user_role - -ab_user_role - -id - - [INTEGER] - NOT NULL - -role_id - - [INTEGER] - -user_id - - [INTEGER] + +ab_user_role + +id + + [INTEGER] + NOT NULL + +role_id + + [INTEGER] + +user_id + + [INTEGER] ab_user--ab_user_role - -0..N -{0,1} + +0..N +{0,1} @@ -2430,28 +2426,28 @@ ab_permission_view_role - -ab_permission_view_role - -id - - [INTEGER] - NOT NULL - -permission_view_id - - [INTEGER] - -role_id - - [INTEGER] + +ab_permission_view_role + +id + + [INTEGER] + NOT NULL + +permission_view_id + + [INTEGER] + +role_id + + [INTEGER] ab_permission_view--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} @@ -2495,16 +2491,16 @@ ab_role--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_role--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 62013ff8f799c..1bdb2d40fccb5 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``e39a26ac59f6`` (head) | ``38770795785f`` | ``3.0.0`` | remove pickled data from dagrun table. | +| ``fbe8516980a3`` (head) | ``e39a26ac59f6`` | ``3.0.0`` | remove external_trigger field. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``e39a26ac59f6`` | ``38770795785f`` | ``3.0.0`` | remove pickled data from dagrun table. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``38770795785f`` | ``5c9c0231baa2`` | ``3.0.0`` | Add asset reference models. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ diff --git a/providers/src/airflow/providers/openlineage/facets/AirflowDagRunFacet.json b/providers/src/airflow/providers/openlineage/facets/AirflowDagRunFacet.json index 165a8e6a59855..c3a83b216ca4e 100644 --- a/providers/src/airflow/providers/openlineage/facets/AirflowDagRunFacet.json +++ b/providers/src/airflow/providers/openlineage/facets/AirflowDagRunFacet.json @@ -75,9 +75,6 @@ "type": "string", "format": "date-time" }, - "external_trigger": { - "type": "boolean" - }, "run_id": { "type": "string" }, diff --git a/providers/src/airflow/providers/openlineage/facets/AirflowRunFacet.json b/providers/src/airflow/providers/openlineage/facets/AirflowRunFacet.json index 621eb6a761453..338ae3ce55b17 100644 --- a/providers/src/airflow/providers/openlineage/facets/AirflowRunFacet.json +++ b/providers/src/airflow/providers/openlineage/facets/AirflowRunFacet.json @@ -235,9 +235,6 @@ "type": "string", "format": "date-time" }, - "external_trigger": { - "type": "boolean" - }, "run_id": { "type": "string" }, diff --git a/providers/src/airflow/providers/openlineage/utils/utils.py b/providers/src/airflow/providers/openlineage/utils/utils.py index 734437f44adc0..5ad0030cd6762 100644 --- a/providers/src/airflow/providers/openlineage/utils/utils.py +++ b/providers/src/airflow/providers/openlineage/utils/utils.py @@ -335,7 +335,6 @@ class DagRunInfo(InfoJsonEncodable): "dag_id", "data_interval_start", "data_interval_end", - "external_trigger", "run_id", "run_type", "start_date", diff --git a/providers/src/airflow/providers/standard/operators/latest_only.py b/providers/src/airflow/providers/standard/operators/latest_only.py index c8b7ce16fb64d..4f46bb1dd8065 100644 --- a/providers/src/airflow/providers/standard/operators/latest_only.py +++ b/providers/src/airflow/providers/standard/operators/latest_only.py @@ -25,6 +25,7 @@ import pendulum from airflow.operators.branch import BaseBranchOperator +from airflow.utils.types import DagRunType if TYPE_CHECKING: from airflow.models import DAG, DagRun @@ -40,7 +41,7 @@ class LatestOnlyOperator(BaseBranchOperator): """ Skip tasks that are not running during the most recent schedule interval. - If the task is run outside the latest schedule interval (i.e. external_trigger), + If the task is run outside the latest schedule interval (i.e. run_type == DagRunType.MANUAL), all directly downstream tasks will be skipped. Note that downstream tasks are never skipped if the given DAG_Run is @@ -53,7 +54,7 @@ def choose_branch(self, context: Context) -> str | Iterable[str]: # If the DAG Run is externally triggered, then return without # skipping downstream tasks dag_run: DagRun = context["dag_run"] # type: ignore[assignment] - if dag_run.external_trigger: + if dag_run.run_type == DagRunType.MANUAL: self.log.info("Externally triggered DAG_Run: allowing execution to proceed.") return list(context["task"].get_direct_relative_ids(upstream=False)) diff --git a/providers/tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py b/providers/tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py index e745d3d655bdc..cce9a884768d0 100644 --- a/providers/tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py +++ b/providers/tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py @@ -151,7 +151,6 @@ def _create_test_dag_run(self, state=DagRunState.RUNNING, extra_dag=False, commi run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time) + timedelta(days=i - 1), start_date=timezone.parse(self.default_time), - external_trigger=True, state=state, **triggered_by_kwargs, ) @@ -168,7 +167,6 @@ def _create_test_dag_run(self, state=DagRunState.RUNNING, extra_dag=False, commi run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time_2), start_date=timezone.parse(self.default_time), - external_trigger=True, state=state, ) ) @@ -200,7 +198,6 @@ def test_should_return_accessible_with_tilde_as_dag_id_and_dag_level_permissions "end_date": None, "state": "running", "logical_date": self.default_time, - "external_trigger": True, "start_date": self.default_time, "conf": {}, "data_interval_end": None, @@ -216,7 +213,6 @@ def test_should_return_accessible_with_tilde_as_dag_id_and_dag_level_permissions "end_date": None, "state": "running", "logical_date": self.default_time_2, - "external_trigger": True, "start_date": self.default_time, "conf": {}, "data_interval_end": None, diff --git a/providers/tests/openlineage/plugins/test_adapter.py b/providers/tests/openlineage/plugins/test_adapter.py index 9e19437142b10..96d47d33fcd2f 100644 --- a/providers/tests/openlineage/plugins/test_adapter.py +++ b/providers/tests/openlineage/plugins/test_adapter.py @@ -51,6 +51,7 @@ ) from airflow.providers.openlineage.utils.utils import get_airflow_job_facet from airflow.utils.task_group import TaskGroup +from airflow.utils.types import DagRunType from tests_common.test_utils.compat import BashOperator from tests_common.test_utils.config import conf_vars @@ -585,9 +586,8 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat "dag_id": "dag_id", "data_interval_start": event_time.isoformat(), "data_interval_end": event_time.isoformat(), - "external_trigger": False if AIRFLOW_V_3_0_PLUS else None, "run_id": run_id, - "run_type": None, + "run_type": DagRunType.MANUAL if AIRFLOW_V_3_0_PLUS else None, "start_date": event_time.isoformat(), }, ) @@ -626,9 +626,8 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat "dag_id": "dag_id", "data_interval_start": event_time.isoformat(), "data_interval_end": event_time.isoformat(), - "external_trigger": False if AIRFLOW_V_3_0_PLUS else None, "run_id": run_id, - "run_type": None, + "run_type": DagRunType.MANUAL if AIRFLOW_V_3_0_PLUS else None, "start_date": event_time.isoformat(), }, ), diff --git a/providers/tests/openlineage/utils/test_utils.py b/providers/tests/openlineage/utils/test_utils.py index 3f653d9025ef9..6c9df3d2351d8 100644 --- a/providers/tests/openlineage/utils/test_utils.py +++ b/providers/tests/openlineage/utils/test_utils.py @@ -129,7 +129,6 @@ def test_get_airflow_dag_run_facet(): dagrun_mock.dag_id = dag.dag_id dagrun_mock.data_interval_start = datetime.datetime(2024, 6, 1, 1, 2, 3, tzinfo=datetime.timezone.utc) dagrun_mock.data_interval_end = datetime.datetime(2024, 6, 1, 2, 3, 4, tzinfo=datetime.timezone.utc) - dagrun_mock.external_trigger = True dagrun_mock.run_id = "manual_2024-06-01T00:00:00+00:00" dagrun_mock.run_type = DagRunType.MANUAL dagrun_mock.start_date = datetime.datetime(2024, 6, 1, 1, 2, 4, tzinfo=datetime.timezone.utc) @@ -157,7 +156,6 @@ def test_get_airflow_dag_run_facet(): "dag_id": "dag", "data_interval_start": "2024-06-01T01:02:03+00:00", "data_interval_end": "2024-06-01T02:03:04+00:00", - "external_trigger": True, "run_id": "manual_2024-06-01T00:00:00+00:00", "run_type": "manual", "start_date": "2024-06-01T01:02:04+00:00", diff --git a/providers/tests/standard/operators/test_bash.py b/providers/tests/standard/operators/test_bash.py index 13b3fad3ad49e..59a0c8bbf239f 100644 --- a/providers/tests/standard/operators/test_bash.py +++ b/providers/tests/standard/operators/test_bash.py @@ -113,7 +113,6 @@ def test_echo_env_variables( logical_date=logical_date, start_date=utc_now, state=State.RUNNING, - external_trigger=False, data_interval=(logical_date, logical_date), ) diff --git a/providers/tests/standard/operators/test_latest_only_operator.py b/providers/tests/standard/operators/test_latest_only_operator.py index 5884c54774c39..64e201ec6871d 100644 --- a/providers/tests/standard/operators/test_latest_only_operator.py +++ b/providers/tests/standard/operators/test_latest_only_operator.py @@ -192,7 +192,6 @@ def test_not_skipping_external(self, dag_maker): start_date=timezone.utcnow(), logical_date=DEFAULT_DATE, state=State.RUNNING, - external_trigger=True, data_interval=(DEFAULT_DATE, DEFAULT_DATE), **triggered_by_kwargs, ) @@ -203,7 +202,6 @@ def test_not_skipping_external(self, dag_maker): start_date=timezone.utcnow(), logical_date=logical_date, state=State.RUNNING, - external_trigger=True, data_interval=(logical_date, logical_date), **triggered_by_kwargs, ) @@ -213,7 +211,6 @@ def test_not_skipping_external(self, dag_maker): start_date=timezone.utcnow(), logical_date=END_DATE, state=State.RUNNING, - external_trigger=True, data_interval=(END_DATE, END_DATE), **triggered_by_kwargs, ) diff --git a/task_sdk/src/airflow/sdk/api/datamodels/_generated.py b/task_sdk/src/airflow/sdk/api/datamodels/_generated.py index 3383e61d1c395..97a7713ef4aaf 100644 --- a/task_sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task_sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -248,7 +248,6 @@ class DagRun(BaseModel): end_date: Annotated[datetime | None, Field(title="End Date")] = None run_type: DagRunType conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None - external_trigger: Annotated[bool, Field(title="External Trigger")] = False class HTTPValidationError(BaseModel): diff --git a/task_sdk/src/airflow/sdk/types.py b/task_sdk/src/airflow/sdk/types.py index fd02104fb2fc4..1a886b45ed1c8 100644 --- a/task_sdk/src/airflow/sdk/types.py +++ b/task_sdk/src/airflow/sdk/types.py @@ -43,7 +43,6 @@ class DagRunProtocol(Protocol): end_date: datetime | None run_type: Any conf: dict[str, Any] | None - external_trigger: bool class RuntimeTaskInstanceProtocol(Protocol): diff --git a/tests/api_connexion/endpoints/test_asset_endpoint.py b/tests/api_connexion/endpoints/test_asset_endpoint.py index 57bea9c6643e2..e13607d03c12e 100644 --- a/tests/api_connexion/endpoints/test_asset_endpoint.py +++ b/tests/api_connexion/endpoints/test_asset_endpoint.py @@ -534,7 +534,6 @@ def test_includes_created_dagrun(self, session): run_type=DagRunType.ASSET_TRIGGERED, logical_date=timezone.parse(self.default_time), start_date=timezone.parse(self.default_time), - external_trigger=True, state="success", ) dagrun.end_date = timezone.parse(self.default_time) diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index 9558dd4fd256a..ed00e8715571e 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -110,7 +110,6 @@ def _create_test_dag_run(self, state=DagRunState.RUNNING, extra_dag=False, commi run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time) + timedelta(days=i - 1), start_date=timezone.parse(self.default_time), - external_trigger=True, state=state, **triggered_by_kwargs, ) @@ -127,7 +126,6 @@ def _create_test_dag_run(self, state=DagRunState.RUNNING, extra_dag=False, commi run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time_2), start_date=timezone.parse(self.default_time), - external_trigger=True, state=state, ) ) @@ -191,7 +189,6 @@ def test_should_respond_200(self, session): run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time), start_date=timezone.parse(self.default_time), - external_trigger=True, state="running", **triggered_by_kwargs, ) @@ -201,7 +198,6 @@ def test_should_respond_200(self, session): "end_date": None, "state": "running", "logical_date": self.default_time, - "external_trigger": True, "start_date": self.default_time, "conf": {}, "data_interval_end": None, @@ -241,7 +237,6 @@ def test_should_raises_401_unauthenticated(self, session): run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time), start_date=timezone.parse(self.default_time), - external_trigger=True, ) session.add(dagrun_model) session.commit() @@ -264,7 +259,6 @@ def test_should_return_specified_fields(self, session, fields): run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time), start_date=timezone.parse(self.default_time), - external_trigger=True, state="running", ) session.add(dagrun_model) @@ -288,7 +282,6 @@ def test_should_respond_400_with_not_exists_fields(self, session): run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time), start_date=timezone.parse(self.default_time), - external_trigger=True, state="running", ) session.add(dagrun_model) @@ -312,7 +305,6 @@ def test_should_respond_200(self, session): "end_date": None, "state": "running", "logical_date": self.default_time, - "external_trigger": True, "start_date": self.default_time, "conf": {}, "data_interval_end": None, @@ -328,7 +320,6 @@ def test_should_respond_200(self, session): "end_date": None, "state": "running", "logical_date": self.default_time_2, - "external_trigger": True, "start_date": self.default_time, "conf": {}, "data_interval_end": None, @@ -382,7 +373,6 @@ def test_return_correct_results_with_order_by(self, session): "end_date": None, "state": "running", "logical_date": self.default_time_2, - "external_trigger": True, "start_date": self.default_time, "conf": {}, "data_interval_end": None, @@ -398,7 +388,6 @@ def test_return_correct_results_with_order_by(self, session): "end_date": None, "state": "running", "logical_date": self.default_time, - "external_trigger": True, "start_date": self.default_time, "conf": {}, "data_interval_end": None, @@ -551,7 +540,6 @@ def _create_dag_runs(self, count): run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time) + timedelta(minutes=i), start_date=timezone.parse(self.default_time), - external_trigger=True, ) for i in range(1, count + 1) ] @@ -656,7 +644,6 @@ def _create_dag_runs(self): run_type=DagRunType.MANUAL, logical_date=timezone.parse(dates[i]), start_date=timezone.parse(dates[i]), - external_trigger=True, state=DagRunState.SUCCESS, ) for i in range(len(dates)) @@ -699,7 +686,6 @@ def test_should_respond_200(self): "end_date": None, "state": "running", "logical_date": self.default_time, - "external_trigger": True, "start_date": self.default_time, "conf": {}, "data_interval_end": None, @@ -716,7 +702,6 @@ def test_should_respond_200(self): "end_date": None, "state": "running", "logical_date": self.default_time_2, - "external_trigger": True, "start_date": self.default_time, "conf": {}, "data_interval_end": None, @@ -777,7 +762,6 @@ def test_order_by_descending_works(self): "end_date": None, "state": "running", "logical_date": self.default_time_2, - "external_trigger": True, "start_date": self.default_time, "conf": {}, "data_interval_end": None, @@ -793,7 +777,6 @@ def test_order_by_descending_works(self): "end_date": None, "state": "running", "logical_date": self.default_time, - "external_trigger": True, "start_date": self.default_time, "conf": {}, "data_interval_end": None, @@ -926,7 +909,6 @@ def _create_dag_runs(self, count): run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time) + timedelta(minutes=i), start_date=timezone.parse(self.default_time), - external_trigger=True, ) for i in range(1, count + 1) ] @@ -1009,7 +991,6 @@ def _create_dag_runs(self): run_type=DagRunType.MANUAL, logical_date=timezone.parse(date), start_date=timezone.parse(date), - external_trigger=True, state="success", ) for i, date in enumerate(dates) @@ -1157,7 +1138,6 @@ def test_should_respond_200( "dag_run_id": expected_dag_run_id, "end_date": None, "logical_date": expected_logical_date, - "external_trigger": True, "start_date": None, "state": "queued", "data_interval_end": expected_data_interval_end, @@ -1266,7 +1246,6 @@ def test_should_response_200_for_matching_logical_date(self): "dag_run_id": dag_run_id, "end_date": None, "logical_date": logical_date, - "external_trigger": True, "start_date": None, "state": "queued", "data_interval_end": logical_date, @@ -1512,7 +1491,6 @@ def test_should_respond_200(self, state, run_type, dag_maker, session): "dag_run_id": dag_run_id, "end_date": dr.end_date.isoformat() if state != State.QUEUED else None, "logical_date": dr.logical_date.isoformat(), - "external_trigger": False, "start_date": dr.start_date.isoformat() if state != State.QUEUED else None, "state": state, "data_interval_start": dr.data_interval_start.isoformat(), @@ -1684,7 +1662,6 @@ def test_should_respond_200(self, dag_maker, session): "dag_id": dag_id, "dag_run_id": dag_run_id, "end_date": None, - "external_trigger": False, "logical_date": dr.logical_date.isoformat(), "start_date": None, "state": "queued", @@ -1877,7 +1854,6 @@ def test_should_raises_401_unauthenticated(self, session): run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time), start_date=timezone.parse(self.default_time), - external_trigger=True, ) session.add(dagrun_model) session.commit() @@ -1906,7 +1882,6 @@ def test_should_respond_200(self, dag_maker, session): "dag_id": dr.dag_id, "dag_run_id": dr.run_id, "end_date": dr.end_date.isoformat(), - "external_trigger": True, "logical_date": self.default_time, "start_date": self.default_time, "state": "success", @@ -1936,7 +1911,6 @@ def test_should_respond_200(self, dag_maker, session): "dag_run_id": dr.run_id, "end_date": dr.end_date.isoformat(), "logical_date": self.default_time, - "external_trigger": True, "start_date": self.default_time, "state": "success", "data_interval_start": None, diff --git a/tests/api_connexion/endpoints/test_dag_stats_endpoint.py b/tests/api_connexion/endpoints/test_dag_stats_endpoint.py index f3538e852eddd..ebb4a9ede8a8b 100644 --- a/tests/api_connexion/endpoints/test_dag_stats_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_stats_endpoint.py @@ -86,7 +86,6 @@ def _create_dag_runs(self, session): run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time), start_date=timezone.parse(self.default_time), - external_trigger=True, state="running", ) dag_1_run_2 = DagRun( @@ -95,7 +94,6 @@ def _create_dag_runs(self, session): run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time) + timedelta(days=1), start_date=timezone.parse(self.default_time), - external_trigger=True, state="failed", ) dag_2_run_1 = DagRun( @@ -104,7 +102,6 @@ def _create_dag_runs(self, session): run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time), start_date=timezone.parse(self.default_time), - external_trigger=True, state="queued", ) dag_3_run_1 = DagRun( @@ -113,7 +110,6 @@ def _create_dag_runs(self, session): run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time), start_date=timezone.parse(self.default_time), - external_trigger=True, state="success", ) session.add_all((dag_1_run_1, dag_1_run_2, dag_2_run_1, dag_3_run_1)) diff --git a/tests/api_connexion/schemas/test_dag_run_schema.py b/tests/api_connexion/schemas/test_dag_run_schema.py index 61b3dac6350a8..8a352b69ee702 100644 --- a/tests/api_connexion/schemas/test_dag_run_schema.py +++ b/tests/api_connexion/schemas/test_dag_run_schema.py @@ -77,7 +77,6 @@ def test_serialize(self, session): "end_date": None, "state": "running", "logical_date": self.default_time, - "external_trigger": True, "start_date": self.default_time, "conf": {"start": "stop"}, "data_interval_end": None, @@ -176,7 +175,6 @@ def test_serialize(self, session): "dag_run_id": "my-dag-run", "end_date": None, "logical_date": self.default_time, - "external_trigger": True, "state": "running", "start_date": self.default_time, "conf": {"start": "stop"}, @@ -195,7 +193,6 @@ def test_serialize(self, session): "end_date": None, "state": "running", "logical_date": self.second_time, - "external_trigger": True, "start_date": self.default_time, "conf": {}, "data_interval_end": None, diff --git a/tests/api_fastapi/common/test_exceptions.py b/tests/api_fastapi/common/test_exceptions.py index 6751aff20c725..7af962ad103f6 100644 --- a/tests/api_fastapi/common/test_exceptions.py +++ b/tests/api_fastapi/common/test_exceptions.py @@ -186,7 +186,7 @@ def test_handle_single_column_unique_constraint_error(self, session, table, expe status_code=status.HTTP_409_CONFLICT, detail={ "reason": "Unique constraint violation", - "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id, bundle_version) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (SELECT max(log_template.id) AS max_1 \nFROM log_template), ?, ?, ?, ?, ?)", + "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id, bundle_version) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (SELECT max(log_template.id) AS max_1 \nFROM log_template), ?, ?, ?, ?, ?)", "orig_error": "UNIQUE constraint failed: dag_run.dag_id, dag_run.run_id", }, ), @@ -194,7 +194,7 @@ def test_handle_single_column_unique_constraint_error(self, session, table, expe status_code=status.HTTP_409_CONFLICT, detail={ "reason": "Unique constraint violation", - "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id, bundle_version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, (SELECT max(log_template.id) AS max_1 \nFROM log_template), %s, %s, %s, %s, %s)", + "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id, bundle_version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, (SELECT max(log_template.id) AS max_1 \nFROM log_template), %s, %s, %s, %s, %s)", "orig_error": "(1062, \"Duplicate entry 'test_dag_id-test_run_id' for key 'dag_run.dag_run_dag_id_run_id_key'\")", }, ), @@ -202,7 +202,7 @@ def test_handle_single_column_unique_constraint_error(self, session, table, expe status_code=status.HTTP_409_CONFLICT, detail={ "reason": "Unique constraint violation", - "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id, bundle_version) VALUES (%(dag_id)s, %(queued_at)s, %(logical_date)s, %(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(creating_job_id)s, %(external_trigger)s, %(run_type)s, %(triggered_by)s, %(conf)s, %(data_interval_start)s, %(data_interval_end)s, %(last_scheduling_decision)s, (SELECT max(log_template.id) AS max_1 \nFROM log_template), %(updated_at)s, %(clear_number)s, %(backfill_id)s, %(dag_version_id)s, %(bundle_version)s) RETURNING dag_run.id", + "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id, bundle_version) VALUES (%(dag_id)s, %(queued_at)s, %(logical_date)s, %(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(creating_job_id)s, %(run_type)s, %(triggered_by)s, %(conf)s, %(data_interval_start)s, %(data_interval_end)s, %(last_scheduling_decision)s, (SELECT max(log_template.id) AS max_1 \nFROM log_template), %(updated_at)s, %(clear_number)s, %(backfill_id)s, %(dag_version_id)s, %(bundle_version)s) RETURNING dag_run.id", "orig_error": 'duplicate key value violates unique constraint "dag_run_dag_id_run_id_key"\nDETAIL: Key (dag_id, run_id)=(test_dag_id, test_run_id) already exists.\n', }, ), diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index a48c0da87fc7a..fbed9c7ccfe3e 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -152,7 +152,6 @@ def _create_dag_run(session, num: int = 2): logical_date=DEFAULT_DATE, start_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), - external_trigger=True, state=DagRunState.SUCCESS, ) for i in range(1, 1 + num) diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index dd32873b084c9..bbd88ba9dd14c 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -224,7 +224,6 @@ def get_dag_run_dict(run: DagRun): ), "run_type": run.run_type, "state": run.state, - "external_trigger": run.external_trigger, "triggered_by": run.triggered_by.value, "conf": run.conf, "note": run.note, @@ -271,7 +270,6 @@ def test_invalid_order_by_raises_400(self, test_client): pytest.param("start_date", [DAG1_RUN1_ID, DAG1_RUN2_ID], id="order_by_start_date"), pytest.param("end_date", [DAG1_RUN1_ID, DAG1_RUN2_ID], id="order_by_end_date"), pytest.param("updated_at", [DAG1_RUN1_ID, DAG1_RUN2_ID], id="order_by_updated_at"), - pytest.param("external_trigger", [DAG1_RUN1_ID, DAG1_RUN2_ID], id="order_by_external_trigger"), pytest.param("conf", [DAG1_RUN1_ID, DAG1_RUN2_ID], id="order_by_conf"), ], ) @@ -514,7 +512,6 @@ def get_dag_run_dict(run: DagRun): ), "run_type": run.run_type, "state": run.state, - "external_trigger": run.external_trigger, "triggered_by": run.triggered_by.value, "conf": run.conf, "note": run.note, @@ -579,7 +576,6 @@ def test_invalid_order_by_raises_400(self, test_client): pytest.param("start_date", DAG_RUNS_LIST, id="order_by_start_date"), pytest.param("end_date", DAG_RUNS_LIST, id="order_by_end_date"), pytest.param("updated_at", DAG_RUNS_LIST, id="order_by_updated_at"), - pytest.param("external_trigger", DAG_RUNS_LIST, id="order_by_external_trigger"), pytest.param("conf", DAG_RUNS_LIST, id="order_by_conf"), ], ) @@ -1180,7 +1176,6 @@ def test_should_respond_200( "dag_run_id": expected_dag_run_id, "end_date": None, "logical_date": fixed_now.replace("+00:00", "Z"), - "external_trigger": True, "start_date": None, "state": "queued", "data_interval_end": expected_data_interval_end, @@ -1353,7 +1348,6 @@ def test_should_response_200_for_duplicate_logical_date(self, test_client): "last_scheduling_decision": None, "run_type": "manual", "state": "queued", - "external_trigger": True, "triggered_by": "rest_api", "conf": {}, "note": note, diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_stats.py b/tests/api_fastapi/core_api/routes/public/test_dag_stats.py index f4c59d48c313a..8a0dae3604c1d 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_stats.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_stats.py @@ -62,7 +62,6 @@ def _create_dag_and_runs(self, session=None): run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time), start_date=timezone.parse(self.default_time), - external_trigger=True, state="running", ) dag_1_run_2 = DagRun( @@ -71,7 +70,6 @@ def _create_dag_and_runs(self, session=None): run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time) + timedelta(days=1), start_date=timezone.parse(self.default_time), - external_trigger=True, state="failed", ) dag_2 = DagModel( @@ -89,7 +87,6 @@ def _create_dag_and_runs(self, session=None): run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time), start_date=timezone.parse(self.default_time), - external_trigger=True, state="queued", ) dag_3 = DagModel( @@ -107,7 +104,6 @@ def _create_dag_and_runs(self, session=None): run_type=DagRunType.MANUAL, logical_date=timezone.parse(self.default_time), start_date=timezone.parse(self.default_time), - external_trigger=True, state="success", ) entities = ( diff --git a/tests/api_fastapi/execution_api/routes/test_task_instances.py b/tests/api_fastapi/execution_api/routes/test_task_instances.py index 9ccd1b0d088a6..05774a8c30d5f 100644 --- a/tests/api_fastapi/execution_api/routes/test_task_instances.py +++ b/tests/api_fastapi/execution_api/routes/test_task_instances.py @@ -99,7 +99,6 @@ def test_ti_run_state_to_running(self, client, session, create_task_instance, ti "data_interval_end": instant_str, "start_date": instant_str, "end_date": None, - "external_trigger": False, "run_type": "manual", "conf": {}, }, diff --git a/tests/cli/commands/remote_commands/test_asset_command.py b/tests/cli/commands/remote_commands/test_asset_command.py index 067bc9e8e1e2b..fae03f6c6b40b 100644 --- a/tests/cli/commands/remote_commands/test_asset_command.py +++ b/tests/cli/commands/remote_commands/test_asset_command.py @@ -138,7 +138,6 @@ def test_cli_assets_materialize(parser: ArgumentParser) -> None: "conf": {}, "dag_id": "asset1_producer", "end_date": None, - "external_trigger": "True", "last_scheduling_decision": None, "note": None, "run_type": "manual", diff --git a/tests/cli/commands/remote_commands/test_dag_command.py b/tests/cli/commands/remote_commands/test_dag_command.py index 04b0ef88104e7..8eba7158d417b 100644 --- a/tests/cli/commands/remote_commands/test_dag_command.py +++ b/tests/cli/commands/remote_commands/test_dag_command.py @@ -499,7 +499,6 @@ def test_trigger_dag(self): assert dagrun, "DagRun not created" assert dagrun.run_type == DagRunType.MANUAL - assert dagrun.external_trigger assert dagrun.conf == {"foo": "bar"} # Coerced to UTC. @@ -529,7 +528,6 @@ def test_trigger_dag_with_microseconds(self): assert dagrun, "DagRun not created" assert dagrun.run_type == DagRunType.MANUAL - assert dagrun.external_trigger assert dagrun.logical_date.isoformat(timespec="microseconds") == "2021-06-04T01:00:00.000001+00:00" def test_trigger_dag_invalid_conf(self): diff --git a/tests/cli/commands/remote_commands/test_task_command.py b/tests/cli/commands/remote_commands/test_task_command.py index 2d6f699c1e788..a043e99c6dcdd 100644 --- a/tests/cli/commands/remote_commands/test_task_command.py +++ b/tests/cli/commands/remote_commands/test_task_command.py @@ -661,7 +661,6 @@ def test_task_states_for_dag_run(self): logical_date=default_date2, data_interval=data_interval, run_type=DagRunType.MANUAL, - external_trigger=True, **v3_kwargs, ) ti2 = TaskInstance(task2, run_id=dagrun.run_id) diff --git a/tests/core/test_core.py b/tests/core/test_core.py index 9103a35674690..760cc1c554f5c 100644 --- a/tests/core/test_core.py +++ b/tests/core/test_core.py @@ -106,7 +106,6 @@ def test_dag_params_and_task_params(self, dag_maker): task2 = EmptyOperator(task_id="task2") dr = dag_maker.create_dagrun( run_type=DagRunType.SCHEDULED, - external_trigger=True, ) task1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) task2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index df823d89c162f..0ad68458359b6 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3425,7 +3425,7 @@ def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state, sessi scheduler_job = Job() self.job_runner = SchedulerJobRunner(job=scheduler_job) - dr1 = dag_maker.create_dagrun(external_trigger=True) + dr1 = dag_maker.create_dagrun(run_type=DagRunType.MANUAL) ti = dr1.get_task_instances(session=session)[0] ti.state = adoptable_state ti.queued_by_job_id = old_job.id @@ -3449,7 +3449,7 @@ def test_adopt_or_reset_orphaned_tasks_external_triggered_dag(self, dag_maker, s self.job_runner = SchedulerJobRunner(job=scheduler_job) session = settings.Session() - dr1 = dag_maker.create_dagrun(external_trigger=True) + dr1 = dag_maker.create_dagrun(run_type=DagRunType.MANUAL) ti = dr1.get_task_instances(session=session)[0] ti.state = State.QUEUED ti.queued_by_job_id = old_job.id @@ -4079,7 +4079,6 @@ def test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run(self, dag_mak logical_date=timezone.utcnow(), run_type=DagRunType.MANUAL, session=session, - external_trigger=True, data_interval=data_interval, **triggered_by_kwargs, ) @@ -4121,7 +4120,6 @@ def test_scheduler_create_dag_runs_check_existing_run(self, dag_maker): logical_date=dag_model.next_dagrun, start_date=timezone.utcnow(), state=State.RUNNING, - external_trigger=False, session=session, creating_job_id=2, ) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 77b749156af3c..fda2a519eaa27 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -1128,7 +1128,7 @@ def test_schedule_dag_no_previous_runs(self): dag_run.logical_date == TEST_DATE ), f"dag_run.logical_date did not match expectation: {dag_run.logical_date}" assert dag_run.state == State.RUNNING - assert not dag_run.external_trigger + assert dag_run.run_type != DagRunType.MANUAL dag.clear() self._clean_up(dag_id) diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index b447ec0fc9999..c60f9417bbac7 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -120,7 +120,6 @@ def create_dag_run( data_interval=data_interval, start_date=now, state=state, - external_trigger=False, **triggered_by_kwargs, # type: ignore ) @@ -183,7 +182,6 @@ def test_dagrun_find(self, session): logical_date=now, start_date=now, state=DagRunState.RUNNING, - external_trigger=True, ) session.add(dag_run) @@ -191,24 +189,23 @@ def test_dagrun_find(self, session): dag_run = DagRun( dag_id=dag_id2, run_id=dag_id2, - run_type=DagRunType.MANUAL, + run_type=DagRunType.SCHEDULED, logical_date=now, start_date=now, state=DagRunState.RUNNING, - external_trigger=False, ) session.add(dag_run) session.commit() - assert len(DagRun.find(dag_id=dag_id1, external_trigger=True)) == 1 + assert len(DagRun.find(dag_id=dag_id1, run_type=DagRunType.MANUAL)) == 1 assert len(DagRun.find(run_id=dag_id1)) == 1 assert len(DagRun.find(run_id=[dag_id1, dag_id2])) == 2 assert len(DagRun.find(logical_date=[now, now])) == 2 assert len(DagRun.find(logical_date=now)) == 2 - assert len(DagRun.find(dag_id=dag_id1, external_trigger=False)) == 0 - assert len(DagRun.find(dag_id=dag_id2, external_trigger=True)) == 0 - assert len(DagRun.find(dag_id=dag_id2, external_trigger=False)) == 1 + assert len(DagRun.find(dag_id=dag_id1, run_type=DagRunType.SCHEDULED)) == 0 + assert len(DagRun.find(dag_id=dag_id2, run_type=DagRunType.MANUAL)) == 0 + assert len(DagRun.find(dag_id=dag_id2)) == 1 def test_dagrun_find_duplicate(self, session): now = timezone.utcnow() @@ -221,7 +218,6 @@ def test_dagrun_find_duplicate(self, session): logical_date=now, start_date=now, state=DagRunState.RUNNING, - external_trigger=True, ) session.add(dag_run) @@ -645,7 +641,6 @@ def test_get_task_instance_on_empty_dagrun(self, session): logical_date=now, start_date=now, state=DagRunState.RUNNING, - external_trigger=False, ) session.add(dag_run) session.commit() diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 8ca00e2f8d650..9989523c2a9cf 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -3781,7 +3781,6 @@ def test_echo_env_variables(self, dag_maker): op = PythonOperator(task_id="hive_in_python_op", python_callable=self._env_var_check_callback) dr = dag_maker.create_dagrun( run_type=DagRunType.MANUAL, - external_trigger=False, ) ti = dr.get_task_instance(op.task_id) ti.state = State.RUNNING diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index 74d8371e5d78e..92f5cfd241bca 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -124,7 +124,7 @@ def test_trigger_dagrun(self, dag_maker): task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) dagrun = dag_maker.session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).one() - assert dagrun.external_trigger + assert dagrun.run_type == DagRunType.MANUAL assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, dagrun.logical_date) self.assert_extra_link(dagrun, task, dag_maker.session) @@ -165,7 +165,7 @@ def test_trigger_dagrun_with_logical_date(self, dag_maker): with create_session() as session: dagrun = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).one() - assert dagrun.external_trigger + assert dagrun.run_type == DagRunType.MANUAL assert dagrun.logical_date == custom_logical_date assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, custom_logical_date) self.assert_extra_link(dagrun, task, session) @@ -203,7 +203,7 @@ def test_trigger_dagrun_twice(self, dag_maker): dagruns = dag_maker.session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all() assert len(dagruns) == 1 triggered_dag_run = dagruns[0] - assert triggered_dag_run.external_trigger + assert triggered_dag_run.run_type == DagRunType.MANUAL assert triggered_dag_run.logical_date == utc_now self.assert_extra_link(triggered_dag_run, task, dag_maker.session) @@ -241,7 +241,6 @@ def test_trigger_dagrun_with_scheduled_dag_run(self, dag_maker): dagruns = dag_maker.session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all() assert len(dagruns) == 1 triggered_dag_run = dagruns[0] - assert triggered_dag_run.external_trigger assert triggered_dag_run.logical_date == utc_now self.assert_extra_link(triggered_dag_run, task, dag_maker.session) @@ -264,7 +263,7 @@ def test_trigger_dagrun_with_templated_logical_date(self, dag_maker): dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all() assert len(dagruns) == 1 triggered_dag_run = dagruns[0] - assert triggered_dag_run.external_trigger + assert triggered_dag_run.run_type == DagRunType.MANUAL assert triggered_dag_run.logical_date == DEFAULT_DATE self.assert_extra_link(triggered_dag_run, task, session) @@ -286,7 +285,7 @@ def test_trigger_dagrun_with_templated_trigger_dag_id(self, dag_maker): dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all() assert len(dagruns) == 1 triggered_dag_run = dagruns[0] - assert triggered_dag_run.external_trigger + assert triggered_dag_run.run_type == DagRunType.MANUAL assert triggered_dag_run.dag_id == TRIGGERED_DAG_ID self.assert_extra_link(triggered_dag_run, task, session) @@ -454,7 +453,7 @@ def test_trigger_dagrun_with_reset_dag_run_true( with create_session() as session: dag_runs = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all() assert len(dag_runs) == expected_dagruns_count - assert dag_runs[0].external_trigger + assert dag_runs[0].run_type == DagRunType.MANUAL def test_trigger_dagrun_with_wait_for_completion_true(self, dag_maker): """Test TriggerDagRunOperator with wait_for_completion.""" diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 84a63674e5119..d48baa178fae1 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -302,7 +302,7 @@ def make_user_defined_macro_filter_dag(): """ def compute_last_dagrun(dag: DAG): - return dag.get_last_dagrun(include_externally_triggered=True) + return dag.get_last_dagrun(include_manually_triggered=True) default_args = {"start_date": datetime(2019, 7, 10)} dag = DAG( diff --git a/tests/serialization/test_serialized_objects.py b/tests/serialization/test_serialized_objects.py index 06bb477becdf4..2b12770ee6fb9 100644 --- a/tests/serialization/test_serialized_objects.py +++ b/tests/serialization/test_serialized_objects.py @@ -131,7 +131,6 @@ class Test: run_type=DagRunType.MANUAL, logical_date=timezone.utcnow(), start_date=timezone.utcnow(), - external_trigger=True, state=DagRunState.SUCCESS, ) DAG_RUN.id = 1 diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py index 63056337a5219..83e3b07d80ed9 100644 --- a/tests/utils/test_db_cleanup.py +++ b/tests/utils/test_db_cleanup.py @@ -48,6 +48,7 @@ run_cleanup, ) from airflow.utils.session import create_session +from airflow.utils.types import DagRunType from tests_common.test_utils.db import ( clear_db_assets, @@ -183,17 +184,23 @@ def test_run_cleanup_dry_run(self, do_delete, check_rows_mock, dry_run): do_delete.assert_called() @pytest.mark.parametrize( - "table_name, date_add_kwargs, expected_to_delete, external_trigger", + "table_name, date_add_kwargs, expected_to_delete, run_type", [ - pytest.param("task_instance", dict(days=0), 0, False, id="beginning"), - pytest.param("task_instance", dict(days=4), 4, False, id="middle"), - pytest.param("task_instance", dict(days=9), 9, False, id="end_exactly"), - pytest.param("task_instance", dict(days=9, microseconds=1), 10, False, id="beyond_end"), - pytest.param("dag_run", dict(days=9, microseconds=1), 9, False, id="beyond_end_dr"), - pytest.param("dag_run", dict(days=9, microseconds=1), 10, True, id="beyond_end_dr_external"), + pytest.param("task_instance", dict(days=0), 0, DagRunType.SCHEDULED, id="beginning"), + pytest.param("task_instance", dict(days=4), 4, DagRunType.SCHEDULED, id="middle"), + pytest.param("task_instance", dict(days=9), 9, DagRunType.SCHEDULED, id="end_exactly"), + pytest.param( + "task_instance", dict(days=9, microseconds=1), 10, DagRunType.SCHEDULED, id="beyond_end" + ), + pytest.param( + "dag_run", dict(days=9, microseconds=1), 9, DagRunType.SCHEDULED, id="beyond_end_dr" + ), + pytest.param( + "dag_run", dict(days=9, microseconds=1), 10, DagRunType.MANUAL, id="beyond_end_dr_external" + ), ], ) - def test__build_query(self, table_name, date_add_kwargs, expected_to_delete, external_trigger): + def test__build_query(self, table_name, date_add_kwargs, expected_to_delete, run_type): """ Verify that ``_build_query`` produces a query that would delete the right task instance records depending on the value of ``clean_before_timestamp``. @@ -208,7 +215,7 @@ def test__build_query(self, table_name, date_add_kwargs, expected_to_delete, ext create_tis( base_date=base_date, num_tis=10, - external_trigger=external_trigger, + run_type=run_type, ) target_table_name = "_airflow_temp_table_name" with create_session() as session: @@ -225,17 +232,23 @@ def test__build_query(self, table_name, date_add_kwargs, expected_to_delete, ext assert row[0] == expected_to_delete @pytest.mark.parametrize( - "table_name, date_add_kwargs, expected_to_delete, external_trigger", + "table_name, date_add_kwargs, expected_to_delete, run_type", [ - pytest.param("task_instance", dict(days=0), 0, False, id="beginning"), - pytest.param("task_instance", dict(days=4), 4, False, id="middle"), - pytest.param("task_instance", dict(days=9), 9, False, id="end_exactly"), - pytest.param("task_instance", dict(days=9, microseconds=1), 10, False, id="beyond_end"), - pytest.param("dag_run", dict(days=9, microseconds=1), 9, False, id="beyond_end_dr"), - pytest.param("dag_run", dict(days=9, microseconds=1), 10, True, id="beyond_end_dr_external"), + pytest.param("task_instance", dict(days=0), 0, DagRunType.SCHEDULED, id="beginning"), + pytest.param("task_instance", dict(days=4), 4, DagRunType.SCHEDULED, id="middle"), + pytest.param("task_instance", dict(days=9), 9, DagRunType.SCHEDULED, id="end_exactly"), + pytest.param( + "task_instance", dict(days=9, microseconds=1), 10, DagRunType.SCHEDULED, id="beyond_end" + ), + pytest.param( + "dag_run", dict(days=9, microseconds=1), 9, DagRunType.SCHEDULED, id="beyond_end_dr" + ), + pytest.param( + "dag_run", dict(days=9, microseconds=1), 10, DagRunType.MANUAL, id="beyond_end_dr_external" + ), ], ) - def test__cleanup_table(self, table_name, date_add_kwargs, expected_to_delete, external_trigger): + def test__cleanup_table(self, table_name, date_add_kwargs, expected_to_delete, run_type): """ Verify that _cleanup_table actually deletes the rows it should. @@ -254,7 +267,7 @@ def test__cleanup_table(self, table_name, date_add_kwargs, expected_to_delete, e create_tis( base_date=base_date, num_tis=num_tis, - external_trigger=external_trigger, + run_type=run_type, ) with create_session() as session: clean_before_date = base_date.add(**date_add_kwargs) @@ -549,7 +562,7 @@ def test_drop_archived_tables(self, mock_input, confirm_mock, inspect_mock, capl confirm_mock.assert_not_called() -def create_tis(base_date, num_tis, external_trigger=False): +def create_tis(base_date, num_tis, run_type=DagRunType.SCHEDULED): with create_session() as session: dag = DagModel(dag_id=f"test-dag_{uuid4()}") session.add(dag) @@ -558,9 +571,8 @@ def create_tis(base_date, num_tis, external_trigger=False): dag_run = DagRun( dag.dag_id, run_id=f"abc_{num}", - run_type="none", + run_type=run_type, start_date=start_date, - external_trigger=external_trigger, ) ti = TaskInstance( PythonOperator(task_id="dummy-task", python_callable=print), run_id=dag_run.run_id diff --git a/tests/www/test_utils.py b/tests/www/test_utils.py index 2e652f7811a63..7aa99c8f05d51 100644 --- a/tests/www/test_utils.py +++ b/tests/www/test_utils.py @@ -506,7 +506,6 @@ def test_wrapped_markdown_with_raw_html(self, html): "data_interval_start": None, "end_date": None, "logical_date": None, - "external_trigger": None, "last_scheduling_decision": None, "note": None, "queued_at": None, diff --git a/tests/www/views/test_views_grid.py b/tests/www/views/test_views_grid.py index e2181aa702d25..2ade6508693c3 100644 --- a/tests/www/views/test_views_grid.py +++ b/tests/www/views/test_views_grid.py @@ -232,7 +232,6 @@ def test_one_run(admin_client, dag_with_runs: list[DagRun], session): "data_interval_start": "2016-01-01T00:00:00+00:00", "end_date": timezone.utcnow().isoformat(), "logical_date": "2016-01-01T00:00:00+00:00", - "external_trigger": False, "last_scheduling_decision": None, "note": None, "queued_at": None, @@ -249,7 +248,6 @@ def test_one_run(admin_client, dag_with_runs: list[DagRun], session): "data_interval_start": "2016-01-02T00:00:00+00:00", "end_date": None, "logical_date": "2016-01-02T00:00:00+00:00", - "external_trigger": False, "last_scheduling_decision": None, "note": None, "queued_at": None,