Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for retry when tasks are stuck in queued #43520

Merged
merged 66 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
8eb60b1
Handle the scenario where a task is queued for longer than `task_queu…
dimberman Nov 1, 2024
066f672
address feedback
dimberman Nov 1, 2024
93cb9ce
remove uv.lock
dimberman Nov 1, 2024
4a5c9e5
Update airflow/jobs/scheduler_job_runner.py
dimberman Nov 1, 2024
076c257
We need to ensure that older versions of airflow don't run into issue…
dimberman Nov 1, 2024
f46d4b0
Merge branch 'handle-stuck-in-queue' of https://github.com/apache/air…
dimberman Nov 1, 2024
4fe701e
Merge branch 'main' into handle-stuck-in-queue
dimberman Nov 1, 2024
9987963
address feedback
dimberman Nov 1, 2024
c39e52f
Merge branch 'handle-stuck-in-queue' of https://github.com/apache/air…
dimberman Nov 1, 2024
05fc02d
pre-commit
dimberman Nov 1, 2024
a1564cc
Update airflow/jobs/scheduler_job_runner.py
dimberman Nov 1, 2024
cbc3453
Merge branch 'main' into handle-stuck-in-queue
dimberman Nov 1, 2024
4950536
Handle the scenario where a task is queued for longer than `task_queu…
dimberman Nov 4, 2024
1c34eea
Merge branch 'handle-stuck-in-queue' of https://github.com/apache/air…
dimberman Nov 4, 2024
2113384
Update airflow/config_templates/config.yml
dimberman Nov 4, 2024
c18f8f4
k8s support
dimberman Nov 5, 2024
6d63990
Merge branch 'handle-stuck-in-queue' of https://github.com/apache/air…
dimberman Nov 5, 2024
5fdf2ec
Merge branch 'main' of https://github.com/apache/airflow into handle-…
dimberman Nov 5, 2024
96611db
Merge branch 'main' of https://github.com/apache/airflow into handle-…
dimberman Nov 6, 2024
ffd1a8e
tested against kubernetes_executor
dimberman Nov 6, 2024
116d130
Simplify the handle stuck in queued interface (#43647)
dstandish Nov 7, 2024
bf45241
fix
dimberman Nov 7, 2024
e232c18
Merge branch 'handle-stuck-in-queue' of https://github.com/apache/air…
dimberman Nov 7, 2024
0935be1
Merge branch 'main' of https://github.com/apache/airflow into handle-…
dimberman Nov 7, 2024
fa90a44
remove commented code
dstandish Nov 7, 2024
e28e9b7
simplify / make more acurrate the log info in backcompat case
dstandish Nov 7, 2024
702b1e4
remove redundant event log
dstandish Nov 7, 2024
16f62ad
cleanup
dimberman Nov 7, 2024
9296af5
nit
dimberman Nov 7, 2024
545c1fc
fix test
dstandish Nov 8, 2024
de68cc0
fix test
dstandish Nov 8, 2024
097801d
handle when func not implemented
dstandish Nov 8, 2024
7ee94e3
reorder the log add to follow the actual requeue
dstandish Nov 8, 2024
872c326
remove obsolete event
dstandish Nov 8, 2024
51c49c2
tweak event language again
dstandish Nov 8, 2024
2bec817
just remove from running set instead of pretending that we are changi…
dstandish Nov 8, 2024
c3a39c2
Merge branch 'main' of https://github.com/apache/airflow into handle-…
dimberman Nov 8, 2024
1ace902
Merge branch 'handle-stuck-in-queue' of https://github.com/apache/air…
dimberman Nov 8, 2024
6e8a1c2
fix test
dimberman Nov 8, 2024
ef55592
fix test
dimberman Nov 8, 2024
5638743
Merge branch 'main' into handle-stuck-in-queue
dstandish Nov 8, 2024
f475d12
Revert signature change in `cleanup_stuck_queued_tasks`
dstandish Nov 8, 2024
3393797
Revert "Revert signature change in `cleanup_stuck_queued_tasks`"
dstandish Nov 8, 2024
c40284d
Refactor this beast to deprecate old function and add new one
dstandish Nov 8, 2024
b8a123d
fix tests
dstandish Nov 8, 2024
e20bea7
cleanups
dstandish Nov 8, 2024
6e1dea6
fix readability
dstandish Nov 8, 2024
72e2e0e
mark as deprecated / add todo
dstandish Nov 8, 2024
1999572
move log line
dstandish Nov 8, 2024
857ddb8
fix test
dstandish Nov 13, 2024
0fddff5
docstring / spelling
dstandish Nov 13, 2024
88f8785
fix test
dstandish Nov 13, 2024
c28ef97
Merge branch 'main' into handle-stuck-in-queue
dstandish Nov 13, 2024
fd99f57
fix tests
dstandish Nov 13, 2024
b62eeb9
don't create an executor event; just fail the task
dstandish Nov 14, 2024
5dce100
Merge branch 'main' into handle-stuck-in-queue
dstandish Nov 14, 2024
c4f4204
Change this from "clean up stuck queued" to "revoke_task"
dstandish Nov 15, 2024
4bd5dbe
update tests
dstandish Nov 15, 2024
37b765a
fix docstring
dstandish Nov 15, 2024
4a503eb
docstring
dstandish Nov 15, 2024
fe8afe0
make the param undocumented
dstandish Nov 15, 2024
ea1faf9
fix fallback
dstandish Nov 15, 2024
f5eba8c
ensure task removed from running / queued
dstandish Nov 15, 2024
14b233e
update test
dstandish Nov 15, 2024
3aa1f37
fix test mistake
dstandish Nov 15, 2024
1d85dbf
small nits
dstandish Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Tuple

import pendulum
from deprecated import deprecated

from airflow.cli.cli_config import DefaultHelpParser
from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.executors.executor_loader import ExecutorLoader
from airflow.models import Log
from airflow.stats import Stats
Expand Down Expand Up @@ -552,7 +554,12 @@ def terminate(self):
"""Get called when the daemon receives a SIGTERM."""
raise NotImplementedError

def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover
@deprecated(
reason="Replaced by function `revoke_task`.",
category=RemovedInAirflow3Warning,
action="ignore",
)
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
"""
Handle remnants of tasks that were failed because they were stuck in queued.

Expand All @@ -563,7 +570,23 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # p
:param tis: List of Task Instances to clean up
:return: List of readable task instances for a warning message
"""
raise NotImplementedError()
raise NotImplementedError

def revoke_task(self, *, ti: TaskInstance):
"""
Attempt to remove task from executor.

It should attempt to ensure that the task is no longer running on the worker,
and ensure that it is cleared out from internal data structures.

It should *not* change the state of the task in airflow, or add any events
to the event buffer.

It should not raise any error.

:param ti: Task instance to remove
"""
raise NotImplementedError

def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
"""
Expand Down
165 changes: 131 additions & 34 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import sys
import time
from collections import Counter, defaultdict, deque
from contextlib import suppress
from datetime import timedelta
from functools import lru_cache, partial
from itertools import groupby
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator

from deprecated import deprecated
from sqlalchemy import and_, delete, exists, func, not_, select, text, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
Expand All @@ -40,7 +42,7 @@
from airflow.callbacks.callback_requests import DagCallbackRequest, TaskCallbackRequest
from airflow.callbacks.pipe_callback_sink import PipeCallbackSink
from airflow.configuration import conf
from airflow.exceptions import UnknownExecutorException
from airflow.exceptions import RemovedInAirflow3Warning, UnknownExecutorException
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.base_job_runner import BaseJobRunner
from airflow.jobs.job import Job, perform_heartbeat
Expand Down Expand Up @@ -99,6 +101,9 @@
DR = DagRun
DM = DagModel

TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT = "stuck in queued reschedule"
""":meta private:"""


class ConcurrencyMap:
"""
Expand Down Expand Up @@ -184,8 +189,15 @@ def __init__(
self._standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor")
self._dag_stale_not_seen_duration = conf.getint("scheduler", "dag_stale_not_seen_duration")
self._task_queued_timeout = conf.getfloat("scheduler", "task_queued_timeout")

self._enable_tracemalloc = conf.getboolean("scheduler", "enable_tracemalloc")

# this param is intentionally undocumented
self._num_stuck_queued_retries = conf.getint(
section="scheduler",
key="num_stuck_in_queued_retries",
fallback=2,
)

if self._enable_tracemalloc:
import tracemalloc

Expand Down Expand Up @@ -1046,7 +1058,7 @@ def _run_scheduler_loop(self) -> None:

timers.call_regular_interval(
conf.getfloat("scheduler", "task_queued_timeout_check_interval"),
self._fail_tasks_stuck_in_queued,
self._handle_tasks_stuck_in_queued,
)

timers.call_regular_interval(
Expand Down Expand Up @@ -1098,6 +1110,7 @@ def _run_scheduler_loop(self) -> None:
for executor in self.job.executors:
try:
# this is backcompat check if executor does not inherit from BaseExecutor
# todo: remove in airflow 3.0
if not hasattr(executor, "_task_event_logs"):
continue
with create_session() as session:
Expand Down Expand Up @@ -1767,48 +1780,132 @@ def _send_dag_callbacks_to_processor(self, dag: DAG, callback: DagCallbackReques
self.log.debug("callback is empty")

@provide_session
def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
"""
Mark tasks stuck in queued for longer than `task_queued_timeout` as failed.
Handle the scenario where a task is queued for longer than `task_queued_timeout`.

Tasks can get stuck in queued for a wide variety of reasons (e.g. celery loses
track of a task, a cluster can't further scale up its workers, etc.), but tasks
should not be stuck in queued for a long time. This will mark tasks stuck in
queued for longer than `self._task_queued_timeout` as failed. If the task has
available retries, it will be retried.
should not be stuck in queued for a long time.

We will attempt to requeue the task (by revoking it from executor and setting to
scheduled) up to 2 times before failing the task.
"""
self.log.debug("Calling SchedulerJob._fail_tasks_stuck_in_queued method")
tasks_stuck_in_queued = self._get_tis_stuck_in_queued(session)
for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items():
try:
for ti in stuck_tis:
executor.revoke_task(ti=ti)
self._maybe_requeue_stuck_ti(
ti=ti,
session=session,
)
except NotImplementedError:
# this block only gets entered if the executor has not implemented `revoke_task`.
# in which case, we try the fallback logic
# todo: remove the call to _stuck_in_queued_backcompat_logic in airflow 3.0.
# after 3.0, `cleanup_stuck_queued_tasks` will be removed, so we should
# just continue immediately.
self._stuck_in_queued_backcompat_logic(executor, stuck_tis)
continue

tasks_stuck_in_queued = session.scalars(
def _get_tis_stuck_in_queued(self, session) -> Iterable[TaskInstance]:
"""Query db for TIs that are stuck in queued."""
return session.scalars(
select(TI).where(
TI.state == TaskInstanceState.QUEUED,
TI.queued_dttm < (timezone.utcnow() - timedelta(seconds=self._task_queued_timeout)),
TI.queued_by_job_id == self.job.id,
)
).all()
)

for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items():
try:
cleaned_up_task_instances = set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
for ti in stuck_tis:
if repr(ti) in cleaned_up_task_instances:
self.log.warning(
"Marking task instance %s stuck in queued as failed. "
"If the task instance has available retries, it will be retried.",
ti,
)
session.add(
Log(
event="stuck in queued",
task_instance=ti.key,
extra=(
"Task will be marked as failed. If the task instance has "
"available retries, it will be retried."
),
)
)
except NotImplementedError:
self.log.debug("Executor doesn't support cleanup of stuck queued tasks. Skipping.")
def _maybe_requeue_stuck_ti(self, *, ti, session):
"""
Requeue task if it has not been attempted too many times.

Otherwise, fail it.
"""
num_times_stuck = self._get_num_times_stuck_in_queued(ti, session)
if num_times_stuck < self._num_stuck_queued_retries:
self.log.info("Task stuck in queued; will try to requeue. task_id=%s", ti.task_id)
session.add(
Log(
event=TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
task_instance=ti.key,
extra=(
f"Task was in queued state for longer than {self._task_queued_timeout} "
"seconds; task state will be set back to scheduled."
),
)
)
self._reschedule_stuck_task(ti)
else:
self.log.info(
"Task requeue attempts exceeded max; marking failed. task_instance=%s",
ti,
)
session.add(
Log(
event="stuck in queued tries exceeded",
task_instance=ti.key,
extra=f"Task was requeued more than {self._num_stuck_queued_retries} times and will be failed.",
)
)
ti.set_state(TaskInstanceState.FAILED, session=session)

@deprecated(
reason="This is backcompat layer for older executor interface. Should be removed in 3.0",
category=RemovedInAirflow3Warning,
action="ignore",
)
def _stuck_in_queued_backcompat_logic(self, executor, stuck_tis):
"""
Try to invoke stuck in queued cleanup for older executor interface.

TODO: remove in airflow 3.0

Here we handle case where the executor pre-dates the interface change that
introduced `cleanup_tasks_stuck_in_queued` and deprecated `cleanup_stuck_queued_tasks`.

"""
with suppress(NotImplementedError):
for ti_repr in executor.cleanup_stuck_queued_tasks(tis=stuck_tis):
self.log.warning(
"Task instance %s stuck in queued. Will be set to failed.",
ti_repr,
)

@provide_session
def _reschedule_stuck_task(self, ti, session=NEW_SESSION):
session.execute(
update(TI)
.where(TI.filter_for_tis([ti]))
.values(
state=TaskInstanceState.SCHEDULED,
queued_dttm=None,
)
.execution_options(synchronize_session=False)
)

@provide_session
def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session: Session = NEW_SESSION) -> int:
"""
Check the Log table to see how many times a taskinstance has been stuck in queued.

We can then use this information to determine whether to reschedule a task or fail it.
"""
return (
session.query(Log)
.where(
Log.task_id == ti.task_id,
Log.dag_id == ti.dag_id,
Log.run_id == ti.run_id,
Log.map_index == ti.map_index,
Log.try_number == ti.try_number,
Log.event == TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
)
.count()
)

@provide_session
def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None:
Expand Down Expand Up @@ -2167,7 +2264,7 @@ def _activate_assets_generate_warnings() -> Iterator[tuple[str, str]]:
session.add(warning)
existing_warned_dag_ids.add(warning.dag_id)

def _executor_to_tis(self, tis: list[TaskInstance]) -> dict[BaseExecutor, list[TaskInstance]]:
def _executor_to_tis(self, tis: Iterable[TaskInstance]) -> dict[BaseExecutor, list[TaskInstance]]:
"""Organize TIs into lists per their respective executor."""
_executor_to_tis: defaultdict[BaseExecutor, list[TaskInstance]] = defaultdict(list)
for ti in tis:
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1375,6 +1375,7 @@ repos
repr
req
reqs
requeued
Reserialize
reserialize
reserialized
Expand Down
44 changes: 24 additions & 20 deletions providers/src/airflow/providers/celery/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from typing import TYPE_CHECKING, Any, Optional, Sequence, Tuple

from celery import states as celery_states
from deprecated import deprecated
from packaging.version import Version

from airflow import __version__ as airflow_version
Expand All @@ -52,7 +53,7 @@
lazy_load_command,
)
from airflow.configuration import conf
from airflow.exceptions import AirflowTaskTimeout
from airflow.exceptions import AirflowProviderDeprecationWarning, AirflowTaskTimeout
from airflow.executors.base_executor import BaseExecutor
from airflow.stats import Stats
from airflow.utils.state import TaskInstanceState
Expand Down Expand Up @@ -433,31 +434,34 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task

return not_adopted_tis

@deprecated(
reason="Replaced by function `revoke_task`. Upgrade airflow core to make this go away.",
category=AirflowProviderDeprecationWarning,
)
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
"""
Handle remnants of tasks that were failed because they were stuck in queued.
Remove tasks stuck in queued from executor and fail them.

Tasks can get stuck in queued. If such a task is detected, it will be marked
as `UP_FOR_RETRY` if the task instance has remaining retries or marked as `FAILED`
if it doesn't.

:param tis: List of Task Instances to clean up
:return: List of readable task instances for a warning message
This method is deprecated. Use `cleanup_tasks_stuck_in_queued` instead.
"""
readable_tis = []
reprs = []
for ti in tis:
reprs.append(repr(ti))
self.revoke_task(ti=ti)
self.fail(ti.key)
return reprs

def revoke_task(self, *, ti: TaskInstance):
from airflow.providers.celery.executors.celery_executor_utils import app

for ti in tis:
readable_tis.append(repr(ti))
task_instance_key = ti.key
self.fail(task_instance_key, None)
celery_async_result = self.tasks.pop(task_instance_key, None)
if celery_async_result:
try:
app.control.revoke(celery_async_result.task_id)
except Exception as ex:
self.log.error("Error revoking task instance %s from celery: %s", task_instance_key, ex)
return readable_tis
celery_async_result = self.tasks.pop(ti.key, None)
if celery_async_result:
try:
app.control.revoke(celery_async_result.task_id)
except Exception:
self.log.exception("Error revoking task instance %s from celery", ti.key)
self.running.discard(ti.key)
self.queued_tasks.pop(ti.key, None)

@staticmethod
def get_cli_commands() -> list[GroupCommand]:
Expand Down
Loading
Loading