Skip to content

Commit

Permalink
Fix race condition in KubernetesExecutor with concurrently running sc…
Browse files Browse the repository at this point in the history
…hedulers

A race condition occurs in the _adopt_completed_pods function when schedulers are running concurrently. _adopt_completed_pods function doesn't keep track of which scheduler went down so it constantly tries to adopt completed pods from normally working schedulers. On Airflow setups with concurrently running schedulers and with a lot of short living DAG's it leads to race condition and open slots leak. You can find detailed analysis of this situation in GitHub issue here (apache#32928 (comment)). The _adopt_completed_pods function was refactored to the _delete_orphaned_completed_pods function, which removes only completed pods that are not bound to running schedulers.

Co-authored-by: Vlad Pastushenko <[email protected]>
  • Loading branch information
2 people authored and romsharon98 committed Oct 28, 2024
1 parent 9772dbe commit be89540
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -586,23 +586,8 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task
pod_list = self._list_pods(query_kwargs)
for pod in pod_list:
self.adopt_launched_task(kube_client, pod, tis_to_flush_by_key)
self._adopt_completed_pods(kube_client)

# as this method can be retried within a short time frame
# (wrapped in a run_with_db_retries of scheduler_job_runner,
# and get retried due to an OperationalError, for example),
# there is a chance that in second attempt, adopt_launched_task will not be called even once
# as all pods are already adopted in the first attempt.
# and tis_to_flush_by_key will contain TIs that are already adopted.
# therefore, we need to check if the TIs are already adopted by the first attempt and remove them.
def _iter_tis_to_flush():
for key, ti in tis_to_flush_by_key.items():
if key in self.running:
self.log.info("%s is already adopted, no need to flush.", ti)
else:
yield ti

tis_to_flush.extend(_iter_tis_to_flush())
self._delete_orphaned_completed_pods()
tis_to_flush.extend(tis_to_flush_by_key.values())
return tis_to_flush

def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
Expand Down Expand Up @@ -677,40 +662,37 @@ def adopt_launched_task(
del tis_to_flush_by_key[ti_key]
self.running.add(ti_key)

def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
@provide_session
def _delete_orphaned_completed_pods(self, session: Session = NEW_SESSION) -> None:
"""
Patch completed pods so that the KubernetesJobWatcher can delete them.
Delete orphaned completed pods with completed TaskInstances.
:param kube_client: kubernetes client for speaking to kube API
Pods that have reached the Completed status are usually deleted by the scheduler to which
they are attached. In case when the scheduler crashes, there is no one to delete these
pods. Therefore, they are deleted from another scheduler using this function.
"""
from airflow.jobs.job import Job, JobState

if TYPE_CHECKING:
assert self.scheduler_job_id
assert self.kube_scheduler

new_worker_id_label = self._make_safe_label_value(self.scheduler_job_id)
query_kwargs = {
"field_selector": "status.phase=Succeeded",
"label_selector": (
"kubernetes_executor=True,"
f"airflow-worker!={new_worker_id_label},{POD_EXECUTOR_DONE_KEY}!=True"
),
}
alive_schedulers_ids = session.scalars(
select(Job.id).where(Job.job_type == "SchedulerJob", Job.state == JobState.RUNNING)
).all()
labels = ["kubernetes_executor=True", f"{POD_EXECUTOR_DONE_KEY}!=True"]
for alive_scheduler_id in alive_schedulers_ids:
labels.append(f"airflow-worker!={self._make_safe_label_value(str(alive_scheduler_id))}")

query_kwargs = {"field_selector": "status.phase=Succeeded", "label_selector": ",".join(labels)}
pod_list = self._list_pods(query_kwargs)
for pod in pod_list:
self.log.info("Attempting to adopt pod %s", pod.metadata.name)
from kubernetes.client.rest import ApiException

try:
kube_client.patch_namespaced_pod(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
body={"metadata": {"labels": {"airflow-worker": new_worker_id_label}}},
)
self.kube_scheduler.delete_pod(pod_name=pod.metadata.name, namespace=pod.metadata.namespace)
self.log.info("Orphaned completed pod %s has been deleted", pod.metadata.name)
except ApiException as e:
self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e)
continue

ti_id = annotations_to_key(pod.metadata.annotations)
self.running.add(ti_id)
self.log.info("Failed to delete orphaned completed pod %s. Reason: %s", pod.metadata.name, e)

def _flush_task_queue(self) -> None:
if TYPE_CHECKING:
Expand Down
169 changes: 49 additions & 120 deletions providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,8 @@
from kubernetes.client.rest import ApiException
from urllib3 import HTTPResponse

from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
KUBERNETES_EXECUTOR,
)
from airflow.models.taskinstance import TaskInstance
from airflow.exceptions import AirflowException
from airflow.jobs.job import Job, JobState
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.operators.empty import EmptyOperator
from airflow.providers.cncf.kubernetes import pod_generator
Expand Down Expand Up @@ -887,11 +882,9 @@ def test_change_state_failed_pod_deletion(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task"
)
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods"
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._delete_orphaned_completed_pods"
)
def test_try_adopt_task_instances(
self, mock_adopt_completed_pods, mock_adopt_launched_task, mock_kube_dynamic_client
):
def test_try_adopt_task_instances(self, mock_delete_orphaned_completed_pods, mock_adopt_launched_task):
executor = self.kubernetes_executor
executor.scheduler_job_id = "10"
ti_key = annotations_to_key(
Expand Down Expand Up @@ -921,13 +914,13 @@ def test_try_adopt_task_instances(
header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
)
mock_adopt_launched_task.assert_called_once_with(mock_kube_client, pod, {ti_key: mock_ti})
mock_adopt_completed_pods.assert_called_once()
mock_delete_orphaned_completed_pods.assert_called_once()
assert reset_tis == [mock_ti] # assume failure adopting when checking return

# Second adoption (queued_by_job_id and external_executor_id no longer match)
mock_kube_dynamic_client.return_value.reset_mock()
mock_adopt_launched_task.reset_mock()
mock_adopt_completed_pods.reset_mock()
mock_delete_orphaned_completed_pods.reset_mock()

mock_ti.queued_by_job_id = "10" # scheduler_job would have updated this after the first adoption
executor.scheduler_job_id = "20"
Expand All @@ -945,16 +938,13 @@ def test_try_adopt_task_instances(
header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
)
mock_adopt_launched_task.assert_called_once() # Won't check args this time around as they get mutated
mock_adopt_completed_pods.assert_called_once()
mock_delete_orphaned_completed_pods.assert_called_once()
assert reset_tis == [] # This time our return is empty - no TIs to reset

@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods"
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._delete_orphaned_completed_pods"
)
def test_try_adopt_task_instances_multiple_scheduler_ids(
self, mock_adopt_completed_pods, mock_kube_dynamic_client
):
def test_try_adopt_task_instances_multiple_scheduler_ids(self, mock_delete_orphaned_completed_pods):
"""We try to find pods only once per scheduler id"""
executor = self.kubernetes_executor
mock_kube_client = mock.MagicMock()
Expand Down Expand Up @@ -1000,10 +990,10 @@ def test_try_adopt_task_instances_multiple_scheduler_ids(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task"
)
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods"
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._delete_orphaned_completed_pods"
)
def test_try_adopt_task_instances_no_matching_pods(
self, mock_adopt_completed_pods, mock_adopt_launched_task, mock_kube_dynamic_client
self, mock_delete_orphaned_completed_pods, mock_adopt_launched_task
):
executor = self.kubernetes_executor
mock_ti = mock.MagicMock(queued_by_job_id="1", external_executor_id="1", dag_id="dag", task_id="task")
Expand All @@ -1016,33 +1006,7 @@ def test_try_adopt_task_instances_no_matching_pods(
assert tis_to_flush == [mock_ti]
assert executor.running == set()
mock_adopt_launched_task.assert_not_called()
mock_adopt_completed_pods.assert_called_once()

@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task"
)
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods"
)
def test_try_adopt_already_adopted_task_instances(
self, mock_adopt_completed_pods, mock_adopt_launched_task, mock_kube_dynamic_client
):
"""For TIs that are already adopted, we should not flush them"""
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_kube_dynamic_client.return_value.get.return_value.items = []
mock_kube_client = mock.MagicMock()
executor = self.kubernetes_executor
executor.kube_client = mock_kube_client
ti_key = TaskInstanceKey("dag", "task", "run_id", 1)
mock_ti = mock.MagicMock(queued_by_job_id="1", external_executor_id="1", key=ti_key)
executor.running = {ti_key}

tis_to_flush = executor.try_adopt_task_instances([mock_ti])
mock_adopt_launched_task.assert_not_called()
mock_adopt_completed_pods.assert_called_once()
assert tis_to_flush == []
assert executor.running == {ti_key}
mock_delete_orphaned_completed_pods.assert_called_once()

@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
def test_adopt_launched_task(self, mock_kube_client):
Expand Down Expand Up @@ -1094,97 +1058,62 @@ def test_adopt_launched_task_api_exception(self, mock_kube_client):
assert tis_to_flush_by_key == {ti_key: {}}
assert executor.running == set()

@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@pytest.mark.db_test
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler.delete_pod"
)
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
def test_adopt_completed_pods(self, mock_kube_client, mock_kube_dynamic_client):
"""We should adopt all completed pods from other schedulers"""
def test_delete_orphaned_completed_pods(self, mock_get_kube_client, mock_delete_pod, session):
"""We should delete all completed pods from failed schedulers"""
executor = self.kubernetes_executor
executor.scheduler_job_id = "modified"
executor.kube_client = mock_kube_client
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_pod_resource = mock.MagicMock()
mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource
mock_kube_dynamic_client.return_value.get.return_value.items = []
executor.kube_config.kube_namespace = "somens"
pod_names = ["one", "two"]

def get_annotations(pod_name):
return {
"dag_id": "dag",
"run_id": "run_id",
"task_id": pod_name,
"try_number": "1",
}
running_job = Job(executor=executor, job_type="SchedulerJob", state=JobState.RUNNING)
failed_job = Job(executor=executor, job_type="SchedulerJob", state=JobState.FAILED)
session.add_all([running_job, failed_job])
session.commit()

mock_kube_dynamic_client.return_value.get.return_value.items = [
mock_kube_client = mock_get_kube_client.return_value
pods_args = [
{"name": "one", "worker_id": running_job.id},
{"name": "two", "worker_id": failed_job.id},
]
mock_kube_client.list_namespaced_pod.return_value.items = [
k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
name=pod_name,
labels={"airflow-worker": pod_name},
annotations=get_annotations(pod_name),
name=pod_args["name"],
labels={"airflow-worker": pod_args["worker_id"]},
namespace="somens",
)
)
for pod_name in pod_names
for pod_args in pods_args
]
expected_running_ti_keys = {annotations_to_key(get_annotations(pod_name)) for pod_name in pod_names}
executor.kube_client = mock_kube_client

executor._adopt_completed_pods(mock_kube_client)
mock_kube_dynamic_client.return_value.get.assert_called_once_with(
resource=mock_pod_resource,
executor.start()
try:
executor._delete_orphaned_completed_pods()
finally:
executor.end()

expected_labels = [
"kubernetes_executor=True",
f"{POD_EXECUTOR_DONE_KEY}!=True",
f"airflow-worker!={running_job.id}",
]
mock_kube_client.list_namespaced_pod.assert_called_once_with(
namespace="somens",
field_selector="status.phase=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker!=modified,airflow_executor_done!=True",
header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
label_selector=",".join(expected_labels),
)
assert len(pod_names) == mock_kube_client.patch_namespaced_pod.call_count
mock_kube_client.patch_namespaced_pod.assert_has_calls(

mock_delete_pod.assert_has_calls(
[
mock.call(
body={"metadata": {"labels": {"airflow-worker": "modified"}}},
name=pod_name,
namespace="somens",
)
for pod_name in pod_names
mock.call(pod_name="one", namespace="somens"),
mock.call(pod_name="two", namespace="somens"),
],
any_order=True,
)
assert executor.running == expected_running_ti_keys

@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
def test_adopt_completed_pods_api_exception(self, mock_kube_client, mock_kube_dynamic_client):
"""We should gracefully handle exceptions when adopting completed pods from other schedulers"""
executor = self.kubernetes_executor
executor.scheduler_job_id = "modified"
executor.kube_client = mock_kube_client
executor.kube_config.kube_namespace = "somens"
pod_names = ["one", "two"]

def get_annotations(pod_name):
return {
"dag_id": "dag",
"run_id": "run_id",
"task_id": pod_name,
"try_number": "1",
}

mock_kube_dynamic_client.return_value.get.return_value.items = [
k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
name=pod_name,
labels={"airflow-worker": pod_name},
annotations=get_annotations(pod_name),
namespace="somens",
)
)
for pod_name in pod_names
]

mock_kube_client.patch_namespaced_pod.side_effect = ApiException(status=400)
executor._adopt_completed_pods(mock_kube_client)
assert len(pod_names) == mock_kube_client.patch_namespaced_pod.call_count
assert executor.running == set()

@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
def test_not_adopt_unassigned_task(self, mock_kube_client):
Expand Down

0 comments on commit be89540

Please sign in to comment.