Skip to content

Commit

Permalink
[AIRFLOW-558] Add Support for dag.catchup=(True|False) Option
Browse files Browse the repository at this point in the history
Added a dag.catchup option and modified the
scheduler to look at the value when scheduling
DagRuns
(by moving dag.start_date up to
dag.previous_schedule),
and added a config option catchup_by_default
(defaults to True) that allows users to set this
to False for all
dags modifying the existing DAGs

In addition, we added a test to jobs.py
(test_dag_catchup_option)

Closes apache#1830 from
btallman/NoBackfill_clean_feature
  • Loading branch information
btallman authored and bolkedebruin committed Jan 13, 2017
1 parent e0f5c0c commit 1caaceb
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 10 deletions.
10 changes: 10 additions & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,14 @@ def run_command(command):
# associated task instance as failed and will re-schedule the task.
scheduler_zombie_task_threshold = 300
# Turn off scheduler catchup by setting this to False.
# Default behavior is unchanged and
# Command Line Backfills still work, but the scheduler
# will not do scheduler catchup if this is False,
# however it can be set on a per DAG basis in the
# DAG definition (catchup)
catchup_by_default = True
# Statsd (https://github.com/etsy/statsd) integration settings
statsd_on = False
statsd_host = localhost
Expand Down Expand Up @@ -486,6 +494,8 @@ def run_command(command):
scheduler_heartbeat_sec = 5
authenticate = true
max_threads = 2
catchup_by_default = True
scheduler_zombie_task_threshold = 300
"""


Expand Down
23 changes: 23 additions & 0 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,25 @@ def create_dag_run(self, dag, session=None):
if dag.schedule_interval == '@once' and last_scheduled_run:
return None

# don't do scheduler catchup for dag's that don't have dag.catchup = True
if not dag.catchup:
# The logic is that we move start_date up until
# one period before, so that datetime.now() is AFTER
# the period end, and the job can be created...
now = datetime.now()
next_start = dag.following_schedule(now)
last_start = dag.previous_schedule(now)
if next_start <= now:
new_start = last_start
else:
new_start = dag.previous_schedule(last_start)

if dag.start_date:
if new_start >= dag.start_date:
dag.start_date = new_start
else:
dag.start_date = new_start

next_run_date = None
if not last_scheduled_run:
# First run
Expand Down Expand Up @@ -756,6 +775,10 @@ def create_dag_run(self, dag, session=None):
self.logger.debug("Dag start date: {}. Next run date: {}"
.format(dag.start_date, next_run_date))

# don't ever schedule in the future
if next_run_date > datetime.now():
return

# this structure is necessary to avoid a TypeError from concatenating
# NoneType
if dag.schedule_interval == '@once':
Expand Down
55 changes: 49 additions & 6 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1023,12 +1023,28 @@ def are_dependents_done(self, session=None):
@provide_session
def previous_ti(self, session=None):
""" The task instance for the task that ran before this task instance """
return session.query(TaskInstance).filter(
TaskInstance.dag_id == self.dag_id,
TaskInstance.task_id == self.task.task_id,
TaskInstance.execution_date ==
self.task.dag.previous_schedule(self.execution_date),
).first()

dag = self.task.dag
if dag:
dr = self.get_dagrun(session=session)
if not dr:
# Means that this TI is NOT being run from a DR, but from a catchup
previous_scheduled_date = dag.previous_schedule(self.execution_date)
if not previous_scheduled_date:
return None
else:
return TaskInstance(task=self.task, execution_date=previous_scheduled_date)

if dag.catchup:
last_dagrun = dr.get_previous_scheduled_dagrun(session=session) if dr else None

else:
last_dagrun = dr.get_previous_dagrun(session=session) if dr else None

if last_dagrun:
return last_dagrun.get_task_instance(self.task_id, session=session)

return None

@provide_session
def are_dependencies_met(
Expand Down Expand Up @@ -2540,6 +2556,8 @@ class DAG(BaseDag, LoggingMixin):
:type sla_miss_callback: types.FunctionType
:param orientation: Specify DAG orientation in graph view (LR, TB, RL, BT)
:type orientation: string
:param catchup: Perform scheduler catchup (or only run latest)? Defaults to True
"type catchup: bool"
"""

def __init__(
Expand All @@ -2557,6 +2575,7 @@ def __init__(
dagrun_timeout=None,
sla_miss_callback=None,
orientation=configuration.get('webserver', 'dag_orientation'),
catchup=configuration.getboolean('scheduler', 'catchup_by_default'),
params=None):

self.user_defined_macros = user_defined_macros
Expand Down Expand Up @@ -2597,6 +2616,7 @@ def __init__(
self.dagrun_timeout = dagrun_timeout
self.sla_miss_callback = sla_miss_callback
self.orientation = orientation
self.catchup = catchup

self._comps = {
'dag_id',
Expand Down Expand Up @@ -3847,6 +3867,29 @@ def get_dag(self):

return self.dag

@provide_session
def get_previous_dagrun(self, session=None):
"""The previous DagRun, if there is one"""

return session.query(DagRun).filter(
DagRun.dag_id == self.dag_id,
DagRun.execution_date < self.execution_date
).order_by(
DagRun.execution_date.desc()
).first()

@provide_session
def get_previous_scheduled_dagrun(self, session=None):
"""The previous, SCHEDULED DagRun, if there is one"""

if not self.dag:
return None

return session.query(DagRun).filter(
DagRun.dag_id == self.dag_id,
DagRun.execution_date == self.dag.previous_schedule(self.execution_date)
).first()

@provide_session
def update_state(self, session=None):
"""
Expand Down
20 changes: 16 additions & 4 deletions airflow/ti_deps/deps/prev_dagrun_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,22 @@ def _get_dep_statuses(self, ti, session, dep_context):
raise StopIteration

# Don't depend on the previous task instance if we are the first task
if ti.execution_date == ti.task.start_date:
yield self._passing_status(
reason="This task instance was the first task instance for it's task.")
raise StopIteration
dag = ti.task.dag
if dag.catchup:
if ti.execution_date == ti.task.start_date:
yield self._passing_status(
reason="This task instance was the first task instance for its task.")
raise StopIteration

else:

dr = ti.get_dagrun()
last_dagrun = dr.get_previous_dagrun() if dr else None

if not last_dagrun:
yield self._passing_status(
reason="This task instance was the first task instance for its task.")
raise StopIteration

previous_ti = ti.previous_ti
if not previous_ti:
Expand Down
52 changes: 52 additions & 0 deletions docs/scheduler.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ the run stamped ``2016-01-01`` will be trigger soon after ``2016-01-01T23:59``.
In other words, the job instance is started once the period it covers
has ended.

**Let's Repeat That** The scheduler runs your job one ``schedule_interval`` AFTER the
start date, at the END of the period.

The scheduler starts an instance of the executor specified in the your
``airflow.cfg``. If it happens to be the ``LocalExecutor``, tasks will be
executed as subprocesses; in the case of ``CeleryExecutor`` and
Expand Down Expand Up @@ -72,6 +75,55 @@ should be triggered and come to a crawl. It might also create undesired
processing when changing the shape of your DAG, by say adding in new
tasks.

Backfill and Catchup
''''''''''''''''''''

An Airflow DAG with a ``start_date``, possibly an ``end_date``, and a ``schedule_interval`` defines a
series of intervals which the scheduler turn into individual Dag Runs and execute. A key capability of
Airflow is that these DAG Runs are atomic, idempotent items, and the scheduler, by default, will examine
the lifetime of the DAG (from start to end/now, one interval at a time) and kick off a DAG Run for any
interval that has not been run (or has been cleared). This concept is called Catchup.

If your DAG is written to handle it's own catchup (IE not limited to the interval, but instead to "Now"
for instance.), then you will want to turn catchup off (Either on the DAG itself with ``dag.catchup =
False``) or by default at the configuration file level with ``catchup_by_default = False``. What this
will do, is to instruct the scheduler to only create a DAG Run for the most current instance of the DAG
interval series.

.. code:: python
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 12, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'schedule_interval': '@hourly',
}
dag = DAG('tutorial', catchup=False, default_args=default_args)
In the example above, if the DAG is picked up by the scheduler daemon on 2016-01-02 at 6 AM, (or from the
command line), a single DAG Run will be created, with an ``execution_date`` of 2016-01-01, and the next
one will be created just after midnight on the morning of 2016-01-03 with an execution date of 2016-01-02.

If the ``dag.catchup`` value had been True instead, the scheduler would have created a DAG Run for each
completed interval between 2015-12-01 and 2016-01-02 (but not yet one for 2016-01-02, as that interval
hasn't completed) and the scheduler will execute them sequentially. This behavior is great for atomic
datasets that can easily be split into periods. Turning catchup off is great if your DAG Runs perform
backfill internally.

External Triggers
'''''''''''''''''

Expand Down
102 changes: 102 additions & 0 deletions tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1101,3 +1101,105 @@ def test_dag_get_active_runs(self):
running_date = 'Except'

self.assertEqual(execution_date, running_date, 'Running Date must match Execution Date')

def test_dag_catchup_option(self):
"""
Test to check that a DAG with catchup = False only schedules beginning now, not back to the start date
"""

now = datetime.datetime.now()
six_hours_ago_to_the_hour = (now - datetime.timedelta(hours=6)).replace(minute=0, second=0, microsecond=0)
three_minutes_ago = now - datetime.timedelta(minutes=3)
two_hours_and_three_minutes_ago = three_minutes_ago - datetime.timedelta(hours=2)

START_DATE = six_hours_ago_to_the_hour
DAG_NAME1 = 'no_catchup_test1'
DAG_NAME2 = 'no_catchup_test2'
DAG_NAME3 = 'no_catchup_test3'

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': START_DATE

}
dag1 = DAG(DAG_NAME1,
schedule_interval='* * * * *',
max_active_runs=1,
default_args=default_args
)

default_catchup = configuration.getboolean('scheduler', 'catchup_by_default')
# Test configs have catchup by default ON

self.assertEqual(default_catchup, True)

# Correct default?
self.assertEqual(dag1.catchup, True)

dag2 = DAG(DAG_NAME2,
schedule_interval='* * * * *',
max_active_runs=1,
catchup=False,
default_args=default_args
)

run_this_1 = DummyOperator(task_id='run_this_1', dag=dag2)
run_this_2 = DummyOperator(task_id='run_this_2', dag=dag2)
run_this_2.set_upstream(run_this_1)
run_this_3 = DummyOperator(task_id='run_this_3', dag=dag2)
run_this_3.set_upstream(run_this_2)

session = settings.Session()
orm_dag = DagModel(dag_id=dag2.dag_id)
session.merge(orm_dag)
session.commit()
session.close()

scheduler = SchedulerJob()
dag2.clear()

dr = scheduler.create_dag_run(dag2)

# We had better get a dag run
self.assertIsNotNone(dr)

# The DR should be scheduled in the last 3 minutes, not 6 hours ago
self.assertGreater(dr.execution_date, three_minutes_ago)

# The DR should be scheduled BEFORE now
self.assertLess(dr.execution_date, datetime.datetime.now())

dag3 = DAG(DAG_NAME3,
schedule_interval='@hourly',
max_active_runs=1,
catchup=False,
default_args=default_args
)

run_this_1 = DummyOperator(task_id='run_this_1', dag=dag3)
run_this_2 = DummyOperator(task_id='run_this_2', dag=dag3)
run_this_2.set_upstream(run_this_1)
run_this_3 = DummyOperator(task_id='run_this_3', dag=dag3)
run_this_3.set_upstream(run_this_2)

session = settings.Session()
orm_dag = DagModel(dag_id=dag3.dag_id)
session.merge(orm_dag)
session.commit()
session.close()

scheduler = SchedulerJob()
dag3.clear()

dr = None
dr = scheduler.create_dag_run(dag3)

# We had better get a dag run
self.assertIsNotNone(dr)

# The DR should be scheduled in the last two hours, not 6 hours ago
self.assertGreater(dr.execution_date, two_hours_and_three_minutes_ago)

# The DR should be scheduled BEFORE now
self.assertLess(dr.execution_date, datetime.datetime.now())

0 comments on commit 1caaceb

Please sign in to comment.