Skip to content

Commit

Permalink
[core][experimental] Avoid false positives in deadlock detection (ray…
Browse files Browse the repository at this point in the history
…-project#47912)

Signed-off-by: Kai-Hsun Chen <[email protected]>
Signed-off-by: ujjawal-khare <[email protected]>
  • Loading branch information
kevin85421 authored and ujjawal-khare committed Oct 15, 2024
1 parent 66b6ffa commit fd032fe
Showing 1 changed file with 1 addition and 50 deletions.
51 changes: 1 addition & 50 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from collections import defaultdict, deque
from collections import defaultdict
from dataclasses import dataclass, asdict
from typing import Any, Dict, List, Tuple, Union, Optional, Set
import logging
Expand Down Expand Up @@ -911,34 +911,6 @@ def _preprocess(self) -> None:
)
direct_input = True

elif (
isinstance(upstream_task.dag_node, ClassMethodNode)
and upstream_task.dag_node.is_class_method_call
):
from ray.dag.constants import RAY_ADAG_ENABLE_DETECT_DEADLOCK

if (
# Ray aDAG deadlock detection has the same check, but
# it may be turned off because of false positives.
# In that case, we need this check to be active.
# TODO: When we clean up Ray aDAG deadlock detection
# this check should be done at one place only.
not RAY_ADAG_ENABLE_DETECT_DEADLOCK
and downstream_actor_handle is not None
and downstream_actor_handle
== upstream_task.dag_node._get_actor_handle()
and upstream_task.dag_node.type_hint.requires_nccl()
):
raise ValueError(
"Compiled DAG does not support NCCL communication between "
"methods on the same actor. NCCL type hint is specified "
"for the channel from method "
f"{upstream_task.dag_node.get_method_name()} to method "
f"{dag_node.get_method_name()} on actor "
f"{downstream_actor_handle}. Please remove the NCCL "
"type hint between these methods."
)

upstream_task.downstream_task_idxs[task_idx] = downstream_actor_handle
task.arg_type_hints.append(upstream_task.dag_node.type_hint)

Expand Down Expand Up @@ -1514,27 +1486,6 @@ def _detect_deadlock(self) -> bool:

from ray.dag import ClassMethodNode

def _get_next_task_idx(task: "CompiledTask") -> Optional[int]:
if (
not isinstance(task.dag_node, ClassMethodNode)
or task.dag_node.is_class_method_output
):
return None
actor_handle = task.dag_node._get_actor_handle()
bind_index = task.dag_node._get_bind_index()
for same_node_task in self.actor_to_tasks[actor_handle]:
if same_node_task.dag_node._get_bind_index() == bind_index + 1:
return same_node_task.idx
return None

def _add_edge(
graph: Dict[int, GraphNode], from_idx: int, to_idx: Optional[int]
):
if to_idx is None:
return
graph[from_idx].out_edges.add(to_idx)
graph[to_idx].in_edges.add(from_idx)

def _is_same_actor(idx1: int, idx2: int) -> bool:
"""
Args:
Expand Down

0 comments on commit fd032fe

Please sign in to comment.