Skip to content

Commit

Permalink
AIP-83 amendment -- update backfill (apache#46248)
Browse files Browse the repository at this point in the history
  • Loading branch information
prabhusneha authored Feb 7, 2025
1 parent 9689cf5 commit 891c67f
Showing 1 changed file with 68 additions and 56 deletions.
124 changes: 68 additions & 56 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
func,
select,
)
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import relationship, validates
from sqlalchemy_jsonfield import JSONField

Expand All @@ -46,7 +47,7 @@
from airflow.settings import json
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, with_row_locks
from airflow.utils.sqlalchemy import UtcDateTime, nulls_first
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

Expand Down Expand Up @@ -253,6 +254,28 @@ def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior, sess
return logical_dates


def should_create_backfill_dag_run(
info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session
) -> bool:
"""Determine if a backfill DAG run should be created and add a BackfillDagRun if required."""
dr = session.scalar(_get_latest_dag_run_row_query(info, session))
if not dr:
return False
non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior)
if non_create_reason:
session.add(
BackfillDagRun(
backfill_id=backfill_id,
dag_run_id=None,
logical_date=info.logical_date,
exception_reason=non_create_reason,
sort_ordinal=backfill_sort_ordinal,
)
)
return True
return False


def _create_backfill_dag_run(
*,
dag: DAG,
Expand All @@ -263,57 +286,54 @@ def _create_backfill_dag_run(
backfill_sort_ordinal,
session,
):
with session.begin_nested() as nested:
dr = session.scalar(
with_row_locks(
query=_get_latest_dag_run_row_query(info, session),
session=session,
),
with session.begin_nested():
should_skip_create_backfill = should_create_backfill_dag_run(
info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session
)
if dr:
non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior)
if non_create_reason:
# rolling back here restores to start of this nested tran
# which releases the lock on the latest dag run, since we
# are not creating a new one
nested.rollback()
session.add(
BackfillDagRun(
backfill_id=backfill_id,
dag_run_id=None,
logical_date=info.logical_date,
exception_reason=non_create_reason,
sort_ordinal=backfill_sort_ordinal,
)
)
return
if should_skip_create_backfill:
return

dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
dr = dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.BACKFILL_JOB,
try:
dr = dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.BACKFILL_JOB,
logical_date=info.logical_date,
data_interval=info.data_interval,
),
logical_date=info.logical_date,
data_interval=info.data_interval,
),
logical_date=info.logical_date,
data_interval=info.data_interval,
run_after=info.run_after,
conf=dag_run_conf,
run_type=DagRunType.BACKFILL_JOB,
triggered_by=DagRunTriggeredByType.BACKFILL,
dag_version=dag_version,
state=DagRunState.QUEUED,
start_date=timezone.utcnow(),
backfill_id=backfill_id,
session=session,
)
session.add(
BackfillDagRun(
run_after=info.run_after,
conf=dag_run_conf,
run_type=DagRunType.BACKFILL_JOB,
triggered_by=DagRunTriggeredByType.BACKFILL,
dag_version=dag_version,
state=DagRunState.QUEUED,
start_date=timezone.utcnow(),
backfill_id=backfill_id,
dag_run_id=dr.id,
sort_ordinal=backfill_sort_ordinal,
logical_date=info.logical_date,
session=session,
)
session.add(
BackfillDagRun(
backfill_id=backfill_id,
dag_run_id=dr.id,
sort_ordinal=backfill_sort_ordinal,
logical_date=info.logical_date,
)
)
except IntegrityError:
log.info(
"Skipped creating backfill dag run for dag_id=%s backfill_id=%s, logical_date=%s (already exists)",
dr.dag_id,
dr.id,
info.logical_date,
)
log.info("Doing session rollback.")
session.rollback()

should_create_backfill_dag_run(
info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session
)
)


def _get_info_list(
Expand Down Expand Up @@ -389,16 +409,8 @@ def _create_backfill(
dag=dag,
)

log.info("obtaining lock on dag %s", dag_id)
# we obtain a lock on dag model so that nothing else will create
# dag runs at the same time. mainly this is required by non-uniqueness
# of logical_date. otherwise we could just create run in a try-except.
dag_model = session.scalar(
with_row_locks(
select(DagModel).where(DagModel.dag_id == dag_id),
session=session,
)
)
dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == dag_id))

if not dag_model:
raise RuntimeError(f"Dag {dag_id} not found")

Expand Down

0 comments on commit 891c67f

Please sign in to comment.