Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for retry when tasks are stuck in queued #43520

Merged
merged 66 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
8eb60b1
Handle the scenario where a task is queued for longer than `task_queu…
dimberman Nov 1, 2024
066f672
address feedback
dimberman Nov 1, 2024
93cb9ce
remove uv.lock
dimberman Nov 1, 2024
4a5c9e5
Update airflow/jobs/scheduler_job_runner.py
dimberman Nov 1, 2024
076c257
We need to ensure that older versions of airflow don't run into issue…
dimberman Nov 1, 2024
f46d4b0
Merge branch 'handle-stuck-in-queue' of https://github.com/apache/air…
dimberman Nov 1, 2024
4fe701e
Merge branch 'main' into handle-stuck-in-queue
dimberman Nov 1, 2024
9987963
address feedback
dimberman Nov 1, 2024
c39e52f
Merge branch 'handle-stuck-in-queue' of https://github.com/apache/air…
dimberman Nov 1, 2024
05fc02d
pre-commit
dimberman Nov 1, 2024
a1564cc
Update airflow/jobs/scheduler_job_runner.py
dimberman Nov 1, 2024
cbc3453
Merge branch 'main' into handle-stuck-in-queue
dimberman Nov 1, 2024
4950536
Handle the scenario where a task is queued for longer than `task_queu…
dimberman Nov 4, 2024
1c34eea
Merge branch 'handle-stuck-in-queue' of https://github.com/apache/air…
dimberman Nov 4, 2024
2113384
Update airflow/config_templates/config.yml
dimberman Nov 4, 2024
c18f8f4
k8s support
dimberman Nov 5, 2024
6d63990
Merge branch 'handle-stuck-in-queue' of https://github.com/apache/air…
dimberman Nov 5, 2024
5fdf2ec
Merge branch 'main' of https://github.com/apache/airflow into handle-…
dimberman Nov 5, 2024
96611db
Merge branch 'main' of https://github.com/apache/airflow into handle-…
dimberman Nov 6, 2024
ffd1a8e
tested against kubernetes_executor
dimberman Nov 6, 2024
116d130
Simplify the handle stuck in queued interface (#43647)
dstandish Nov 7, 2024
bf45241
fix
dimberman Nov 7, 2024
e232c18
Merge branch 'handle-stuck-in-queue' of https://github.com/apache/air…
dimberman Nov 7, 2024
0935be1
Merge branch 'main' of https://github.com/apache/airflow into handle-…
dimberman Nov 7, 2024
fa90a44
remove commented code
dstandish Nov 7, 2024
e28e9b7
simplify / make more acurrate the log info in backcompat case
dstandish Nov 7, 2024
702b1e4
remove redundant event log
dstandish Nov 7, 2024
16f62ad
cleanup
dimberman Nov 7, 2024
9296af5
nit
dimberman Nov 7, 2024
545c1fc
fix test
dstandish Nov 8, 2024
de68cc0
fix test
dstandish Nov 8, 2024
097801d
handle when func not implemented
dstandish Nov 8, 2024
7ee94e3
reorder the log add to follow the actual requeue
dstandish Nov 8, 2024
872c326
remove obsolete event
dstandish Nov 8, 2024
51c49c2
tweak event language again
dstandish Nov 8, 2024
2bec817
just remove from running set instead of pretending that we are changi…
dstandish Nov 8, 2024
c3a39c2
Merge branch 'main' of https://github.com/apache/airflow into handle-…
dimberman Nov 8, 2024
1ace902
Merge branch 'handle-stuck-in-queue' of https://github.com/apache/air…
dimberman Nov 8, 2024
6e8a1c2
fix test
dimberman Nov 8, 2024
ef55592
fix test
dimberman Nov 8, 2024
5638743
Merge branch 'main' into handle-stuck-in-queue
dstandish Nov 8, 2024
f475d12
Revert signature change in `cleanup_stuck_queued_tasks`
dstandish Nov 8, 2024
3393797
Revert "Revert signature change in `cleanup_stuck_queued_tasks`"
dstandish Nov 8, 2024
c40284d
Refactor this beast to deprecate old function and add new one
dstandish Nov 8, 2024
b8a123d
fix tests
dstandish Nov 8, 2024
e20bea7
cleanups
dstandish Nov 8, 2024
6e1dea6
fix readability
dstandish Nov 8, 2024
72e2e0e
mark as deprecated / add todo
dstandish Nov 8, 2024
1999572
move log line
dstandish Nov 8, 2024
857ddb8
fix test
dstandish Nov 13, 2024
0fddff5
docstring / spelling
dstandish Nov 13, 2024
88f8785
fix test
dstandish Nov 13, 2024
c28ef97
Merge branch 'main' into handle-stuck-in-queue
dstandish Nov 13, 2024
fd99f57
fix tests
dstandish Nov 13, 2024
b62eeb9
don't create an executor event; just fail the task
dstandish Nov 14, 2024
5dce100
Merge branch 'main' into handle-stuck-in-queue
dstandish Nov 14, 2024
c4f4204
Change this from "clean up stuck queued" to "revoke_task"
dstandish Nov 15, 2024
4bd5dbe
update tests
dstandish Nov 15, 2024
37b765a
fix docstring
dstandish Nov 15, 2024
4a503eb
docstring
dstandish Nov 15, 2024
fe8afe0
make the param undocumented
dstandish Nov 15, 2024
ea1faf9
fix fallback
dstandish Nov 15, 2024
f5eba8c
ensure task removed from running / queued
dstandish Nov 15, 2024
14b233e
update test
dstandish Nov 15, 2024
3aa1f37
fix test mistake
dstandish Nov 15, 2024
1d85dbf
small nits
dstandish Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,14 @@ core:
type: integer
example: ~
default: "30"
num_stuck_reschedules:
description: |
Number of times Airflow will reschedule a task that gets stuck in queued before marking the
task as failed
version_added: 2.10.0
type: integer
example: ~
default: "2"
xcom_backend:
description: |
Path to custom XCom class that will be used to store and resolve operators results
Expand Down
69 changes: 61 additions & 8 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@
DR = DagRun
DM = DagModel

RETRY_STUCK_IN_QUEUED_EVENT = "retrying stuck in queued"


class ConcurrencyMap:
"""
Expand Down Expand Up @@ -1078,7 +1080,7 @@ def _run_scheduler_loop(self) -> None:

timers.call_regular_interval(
conf.getfloat("scheduler", "task_queued_timeout_check_interval"),
self._fail_tasks_stuck_in_queued,
self._handle_tasks_stuck_in_queued,
)

timers.call_regular_interval(
Expand Down Expand Up @@ -1785,15 +1787,23 @@ def _send_dag_callbacks_to_processor(self, dag: DAG, callback: DagCallbackReques
self.log.debug("callback is empty")

@provide_session
def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
"""
Mark tasks stuck in queued for longer than `task_queued_timeout` as failed.

Handle the scenario where a task is queued for longer than `task_queued_timeout`.

Tasks can get stuck in queued for a wide variety of reasons (e.g. celery loses
track of a task, a cluster can't further scale up its workers, etc.), but tasks
should not be stuck in queued for a long time. This will mark tasks stuck in
queued for longer than `self._task_queued_timeout` as failed. If the task has
available retries, it will be retried.
should not be stuck in queued for a long time.

Originally, we simply marked a task as failed when it was stuck in queued for
too long. We found that this led to suboptimal outcomes as ideally we would like "failed"
to mean that a task was unable to run, instead of it meaning that we were unable to run the task.

As a compromise between always failing a stuck task and always rescheduling a stuck task (which could
lead to tasks being stuck in queued forever without informing the user), we have creating the config
`AIRFLOW__CORE__NUM_STUCK_RETRIES`. With this new configuration, an airflow admin can decide how
sensitive they would like their airflow to be WRT failing stuck tasks.
"""
self.log.debug("Calling SchedulerJob._fail_tasks_stuck_in_queued method")

Expand All @@ -1805,6 +1815,7 @@ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
)
).all()

num_allowed_retries = conf.getint("core", "num_stuck_reschedules")
for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items():
try:
cleaned_up_task_instances = set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
Expand All @@ -1817,17 +1828,59 @@ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
)
session.add(
Log(
event="stuck in queued",
event=RETRY_STUCK_IN_QUEUED_EVENT,
task_instance=ti.key,
extra=(
"Task will be marked as failed. If the task instance has "
f"Task was stuck in queued and will be requeued, once it has hit {num_allowed_retries} attempts"
"Task will be marked as failed. After that, if the task instance has "
"available retries, it will be retried."
),
)
)

num_times_stuck = self._get_num_times_stuck_in_queued(ti, session)
if num_times_stuck < num_allowed_retries:
executor.change_state(ti.key, State.SCHEDULED)
else:
session.add(
Log(
event="failing stuck in queued",
task_instance=ti.key,
extra=(
"Task will be marked as failed. If the task instance has "
"available retries, it will be retried."
),
)
)
executor.fail(ti.key)


except NotImplementedError:
self.log.debug("Executor doesn't support cleanup of stuck queued tasks. Skipping.")

@provide_session
def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session: Session = NEW_SESSION) -> int:
"""
Check the Log table to see how many times a taskinstance has been stuck in queued.

We can then use this information to determine whether to reschedule a task or fail it.
"""
return session.query(Log).where(
Log.task_id == ti.task_id,
Log.dag_id == ti.dag_id,
Log.run_id == ti.run_id,
Log.map_index == ti.map_index,
Log.try_number == ti.try_number,
Log.event == RETRY_STUCK_IN_QUEUED_EVENT,
).count()

@provide_session
def _reset_task_instance(self, ti: TaskInstance, session: Session = NEW_SESSION):
ti.external_executor_id = None
ti.state = State.SCHEDULED
session.merge(ti)
session.commit()

@provide_session
def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None:
from airflow.models.pool import Pool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,8 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
for ti in tis:
readable_tis.append(repr(ti))
task_instance_key = ti.key
self.fail(task_instance_key, None)
if Version(airflow_version) < Version("2.10.4"):
self.fail(task_instance_key)
celery_async_result = self.tasks.pop(task_instance_key, None)
if celery_async_result:
try:
Expand Down
72 changes: 64 additions & 8 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2202,7 +2202,7 @@ def test_adopt_or_reset_orphaned_tasks_multiple_executors(self, dag_maker, mock_
# Second executor called for ti3
mock_executors[1].try_adopt_task_instances.assert_called_once_with([ti3])

def test_fail_stuck_queued_tasks(self, dag_maker, session, mock_executors):
def test_handle_stuck_queued_tasks(self, dag_maker, session, mock_executors):
with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
op1 = EmptyOperator(task_id="op1")
op2 = EmptyOperator(task_id="op2", executor="default_exec")
Expand All @@ -2228,7 +2228,7 @@ def test_fail_stuck_queued_tasks(self, dag_maker, session, mock_executors):
(None,): mock_executors[0],
("secondary_exec",): mock_executors[1],
}[x]
job_runner._fail_tasks_stuck_in_queued()
job_runner._handle_tasks_stuck_in_queued()

# Default executor is called for ti1 (no explicit executor override uses default) and ti2 (where we
# explicitly marked that for execution by the default executor)
Expand All @@ -2238,7 +2238,63 @@ def test_fail_stuck_queued_tasks(self, dag_maker, session, mock_executors):
mock_executors[0].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti2, ti1])
mock_executors[1].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti3])

def test_fail_stuck_queued_tasks_raises_not_implemented(self, dag_maker, session, caplog):
def test_handle_stuck_queued_tasks_multiple_attempts(self, dag_maker, session, mock_executors):
with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
op1 = EmptyOperator(task_id="op1")
op2 = EmptyOperator(task_id="op2", executor="default_exec")

dr = dag_maker.create_dagrun()
ti1 = dr.get_task_instance(task_id=op1.task_id, session=session)
ti2 = dr.get_task_instance(task_id=op2.task_id, session=session)
for ti in [ti1, ti2]:
ti.state = State.QUEUED
ti.queued_dttm = timezone.utcnow() - timedelta(minutes=15)
session.commit()
scheduler_job = Job()
job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=0)
job_runner._task_queued_timeout = 300

# We need to return the representations s.t. the handle function creates the logs and checks for retries
mock_executors[0].cleanup_stuck_queued_tasks.return_value = {repr(ti1), repr(ti2)}

with mock.patch("airflow.executors.executor_loader.ExecutorLoader.load_executor") as loader_mock:
# The executors are mocked, so cannot be loaded/imported. Mock load_executor and return the
# correct object for the given input executor name.
loader_mock.side_effect = lambda *x: {
("default_exec",): mock_executors[0],
(None,): mock_executors[0],
("secondary_exec",): mock_executors[1],
}[x]
job_runner._handle_tasks_stuck_in_queued()

# If the task gets stuck in queued once, we reset it to scheduled
mock_executors[0].change_state.assert_has_calls(
calls=[
mock.call(ti1.key, "scheduled"),
mock.call(ti2.key, "scheduled"),
]
)
mock_executors[0].fail.assert_not_called()

with mock.patch("airflow.executors.executor_loader.ExecutorLoader.load_executor") as loader_mock:
loader_mock.side_effect = lambda *x: {
("default_exec",): mock_executors[0],
(None,): mock_executors[0],
("secondary_exec",): mock_executors[1],
}[x]
job_runner._handle_tasks_stuck_in_queued()
mock_executors[0].fail.assert_not_called()
job_runner._handle_tasks_stuck_in_queued()

# If the task gets stuck in queued 3 or more times, we fail the task
mock_executors[0].fail.assert_has_calls(
calls=[
mock.call(ti1.key),
mock.call(ti2.key),
]
)

def test_handle_stuck_queued_tasks_raises_not_implemented(self, dag_maker, session, caplog):
with dag_maker("test_fail_stuck_queued_tasks"):
op1 = EmptyOperator(task_id="op1")

Expand All @@ -2253,7 +2309,7 @@ def test_fail_stuck_queued_tasks_raises_not_implemented(self, dag_maker, session
job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=0)
job_runner._task_queued_timeout = 300
with caplog.at_level(logging.DEBUG):
job_runner._fail_tasks_stuck_in_queued()
job_runner._handle_tasks_stuck_in_queued()
assert "Executor doesn't support cleanup of stuck queued tasks. Skipping." in caplog.text

@mock.patch("airflow.dag_processing.manager.DagFileProcessorAgent")
Expand Down Expand Up @@ -5190,25 +5246,25 @@ def test_no_dagruns_would_stuck_in_running(self, dag_maker):
with dag_maker("test_dagrun_states_are_correct_2", start_date=date) as dag:
EmptyOperator(task_id="dummy_task")
for i in range(16):
dr = dag_maker.create_dagrun(run_id=f"dr2_run_{i+1}", state=State.RUNNING, execution_date=date)
dr = dag_maker.create_dagrun(run_id=f"dr2_run_{i + 1}", state=State.RUNNING, execution_date=date)
date = dr.execution_date + timedelta(hours=1)
dr16 = DagRun.find(run_id="dr2_run_16")
date = dr16[0].execution_date + timedelta(hours=1)
for i in range(16, 32):
dr = dag_maker.create_dagrun(run_id=f"dr2_run_{i+1}", state=State.QUEUED, execution_date=date)
dr = dag_maker.create_dagrun(run_id=f"dr2_run_{i + 1}", state=State.QUEUED, execution_date=date)
date = dr.execution_date + timedelta(hours=1)

# third dag and dagruns
date = timezone.datetime(2021, 1, 1)
with dag_maker("test_dagrun_states_are_correct_3", start_date=date) as dag:
EmptyOperator(task_id="dummy_task")
for i in range(16):
dr = dag_maker.create_dagrun(run_id=f"dr3_run_{i+1}", state=State.RUNNING, execution_date=date)
dr = dag_maker.create_dagrun(run_id=f"dr3_run_{i + 1}", state=State.RUNNING, execution_date=date)
date = dr.execution_date + timedelta(hours=1)
dr16 = DagRun.find(run_id="dr3_run_16")
date = dr16[0].execution_date + timedelta(hours=1)
for i in range(16, 32):
dr = dag_maker.create_dagrun(run_id=f"dr2_run_{i+1}", state=State.QUEUED, execution_date=date)
dr = dag_maker.create_dagrun(run_id=f"dr2_run_{i + 1}", state=State.QUEUED, execution_date=date)
date = dr.execution_date + timedelta(hours=1)

scheduler_job = Job()
Expand Down
Loading