Skip to content

Commit

Permalink
[AIRFLOW-1124] Do not set all tasks to scheduled in backfill
Browse files Browse the repository at this point in the history
Backfill is supposed to fill in the blanks and not
to reschedule
all tasks. This fixes a regression from 1.8.0.

Closes apache#2247 from bolkedebruin/AIRFLOW-1124
  • Loading branch information
bolkedebruin committed Apr 19, 2017
1 parent 6684597 commit 0406462
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 1 deletion.
3 changes: 2 additions & 1 deletion airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1821,7 +1821,8 @@ def _execute(self):

for ti in run.get_task_instances():
# all tasks part of the backfill are scheduled to run
ti.set_state(State.SCHEDULED, session=session)
if ti.state == State.NONE:
ti.set_state(State.SCHEDULED, session=session)
tasks_to_run[ti.key] = ti

next_run_date = self.dag.following_schedule(next_run_date)
Expand Down
69 changes: 69 additions & 0 deletions tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,75 @@ def test_sub_set_subdag(self):
else:
self.assertEqual(State.NONE, ti.state)

def test_backfill_fill_blanks(self):
dag = DAG(
'test_backfill_fill_blanks',
start_date=DEFAULT_DATE,
default_args={'owner': 'owner1'},
)

with dag:
op1 = DummyOperator(task_id='op1')
op2 = DummyOperator(task_id='op2')
op3 = DummyOperator(task_id='op3')
op4 = DummyOperator(task_id='op4')
op5 = DummyOperator(task_id='op5')
op6 = DummyOperator(task_id='op6')

dag.clear()
dr = dag.create_dagrun(run_id='test',
state=State.SUCCESS,
execution_date=DEFAULT_DATE,
start_date=DEFAULT_DATE)
executor = TestExecutor(do_update=True)

session = settings.Session()

tis = dr.get_task_instances()
for ti in tis:
if ti.task_id == op1.task_id:
ti.state = State.UP_FOR_RETRY
ti.end_date = DEFAULT_DATE
elif ti.task_id == op2.task_id:
ti.state = State.FAILED
elif ti.task_id == op3.task_id:
ti.state = State.SKIPPED
elif ti.task_id == op4.task_id:
ti.state = State.SCHEDULED
elif ti.task_id == op5.task_id:
ti.state = State.UPSTREAM_FAILED
# op6 = None
session.merge(ti)
session.commit()
session.close()

job = BackfillJob(dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE,
executor=executor)
self.assertRaisesRegexp(
AirflowException,
'Some task instances failed',
job.run)

self.assertRaises(sqlalchemy.orm.exc.NoResultFound, dr.refresh_from_db)
# the run_id should have changed, so a refresh won't work
drs = DagRun.find(dag_id=dag.dag_id, execution_date=DEFAULT_DATE)
dr = drs[0]

self.assertEqual(dr.state, State.FAILED)

tis = dr.get_task_instances()
for ti in tis:
if ti.task_id in (op1.task_id, op4.task_id, op6.task_id):
self.assertEqual(ti.state, State.SUCCESS)
elif ti.task_id == op2.task_id:
self.assertEqual(ti.state, State.FAILED)
elif ti.task_id == op3.task_id:
self.assertEqual(ti.state, State.SKIPPED)
elif ti.task_id == op5.task_id:
self.assertEqual(ti.state, State.UPSTREAM_FAILED)

def test_backfill_execute_subdag(self):
dag = self.dagbag.get_dag('example_subdag_operator')
subdag_op_task = dag.get_task('section-1')
Expand Down

0 comments on commit 0406462

Please sign in to comment.