Skip to content

Commit

Permalink
Enable specifying hostname(s) in DragonRunRequests
Browse files Browse the repository at this point in the history
  • Loading branch information
ankona committed Jul 31, 2024
1 parent e1f0e07 commit 81a59b0
Show file tree
Hide file tree
Showing 10 changed files with 911 additions and 20 deletions.
1 change: 1 addition & 0 deletions doc/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Jump to:

Description

- Enable hostname selection for dragon tasks
- Enable dynamic feature store selection
- Adjust schemas for better performance
- Add TorchWorker first implementation and mock inference app example
Expand Down
161 changes: 148 additions & 13 deletions smartsim/_core/launcher/dragon/dragonBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import dragon.native.process_group as dragon_process_group
import dragon.native.machine as dragon_machine

from smartsim._core.launcher.dragon.pqueue import NodePrioritizer

# pylint: enable=import-error
# isort: on
from ...._core.config import get_config
Expand All @@ -69,6 +71,10 @@
logger = get_logger(__name__)


# tracking structure for [num_refs, node_name, is_dirty]
_NodeRefCount = t.List[t.Union[int, str, bool]]


class DragonStatus(str, Enum):
ERROR = str(dragon_group_state.Error())
RUNNING = str(dragon_group_state.Running())
Expand Down Expand Up @@ -190,21 +196,33 @@ def __init__(self, pid: int) -> None:
self._view = DragonBackendView(self)
logger.debug(self._view.host_desc)
self._infra_ddict: t.Optional[dragon_ddict.DDict] = None
self._prioritizer = NodePrioritizer(self._nodes, self._queue_lock)

@property
def hosts(self) -> list[str]:
with self._queue_lock:
return self._hosts

# todo: remove
@property
def free_hosts(self) -> t.Deque[str]: # todo: swap to dict[str, str]:
with self._queue_lock:
return self._free_hosts

# todo: remove
@property
def allocated_hosts(self) -> dict[str, str]:
with self._queue_lock:
return self._allocated_hosts

@property
def free_hosts(self) -> t.Deque[str]:
with self._queue_lock:
return self._free_hosts
def assigned_steps(self) -> dict[str, str]:
return self._assigned_steps

# @property
# def free_hosts(self) -> t.List[str]:
# with self._queue_lock:
# return [str(item[1]) for item in self._prioritizer.unassigned()]

@property
def group_infos(self) -> dict[str, ProcessGroupInfo]:
Expand All @@ -220,12 +238,23 @@ def _initialize_hosts(self) -> None:
self._cpus = [node.num_cpus for node in self._nodes]
self._gpus = [node.num_gpus for node in self._nodes]

"""List of hosts available in allocation"""
self._free_hosts: t.Deque[str] = collections.deque(self._hosts)
# """List of hosts available in allocation"""
self._free_hosts: t.Deque[str] = collections.deque(
self._hosts
) # todo: remove
"""List of hosts on which steps can be launched"""
self._allocated_hosts: t.Dict[str, str] = {}
# todo: replace self._allocated_hosts w/self._assigned_steps
self._assigned_steps: t.Dict[str, str] = {}
"""Mapping of hosts on which a step is already running to step ID"""

self._ref_map: t.Dict[str, _NodeRefCount] = {}
"""Map node names to a ref counter for direct access"""
self._cpu_refs: t.List[_NodeRefCount] = []
"""Track reference counts to CPU-only nodes"""
self._gpu_refs: t.List[_NodeRefCount] = []
"""Track reference counts to GPU nodes"""

def __str__(self) -> str:
return self.status_message

Expand Down Expand Up @@ -305,23 +334,18 @@ def _can_honor_policy(
# make sure some node has enough CPUs
available = max(self._cpus)
requested = max(request.policy.cpu_affinity)

if requested >= available:
return False, "Cannot satisfy request, not enough CPUs available"

if request.policy.gpu_affinity:
# make sure some node has enough GPUs
available = max(self._gpus)
requested = max(request.policy.gpu_affinity)

if requested >= available:
return False, "Cannot satisfy request, not enough GPUs available"

return True, None

def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str]]:
"""Check if request can be honored with resources available in the allocation.
Currently only checks for total number of nodes,
in the future it will also look at other constraints
such as memory, accelerators, and so on.
Expand All @@ -339,20 +363,129 @@ def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str]
return False, err

return True, None
# honorable, err = self._can_honor_hosts(request)
# if not honorable:
# return False, err

# honorable, err = self._can_honor_state(request)
# if not honorable:
# return False, err

# honorable, err = self._can_honor_affinities(request)
# if not honorable:
# return False, err

# return True, None

def _can_honor_affinities(
self, request: DragonRunRequest
) -> t.Tuple[bool, t.Optional[str]]:
"""Check if the policy can be honored with resources available
in the allocation.
:param request: the DragonRunRequest to verify
:returns: Tuple indicating if the request can be honored and
an optional error message"""
if request.policy:
if request.policy.cpu_affinity:
# make sure some node has enough CPUs
available = max(self._cpus)
requested = max(request.policy.cpu_affinity)

if requested >= available:
return False, "Cannot satisfy request, not enough CPUs available"

if request.policy.gpu_affinity:
# make sure some node has enough GPUs
available = max(self._gpus)
requested = max(request.policy.gpu_affinity)

if requested >= available:
return False, "Cannot satisfy request, not enough GPUs available"

return True, None

def _can_honor_hosts(
self, request: DragonRunRequest
) -> t.Tuple[bool, t.Optional[str]]:
"""Check if the current state of the backend process inhibits executing
the request.
:param request: the DragonRunRequest to verify
:returns: Tuple indicating if the request can be honored and
an optional error message"""
if request.nodes > len(self._hosts):
message = f"Cannot satisfy request. Requested {request.nodes} nodes, "
message += f"but only {len(self._hosts)} nodes are available."
return False, message

requested_hosts: t.Set[str] = set()
if request.hostlist:
requested_hosts = set(request.hostlist)
if not requested_hosts:
return True, None

all_hosts = set(self._hosts)
valid_hosts = all_hosts.intersection(requested_hosts)

# don't worry about count when only hostnames are supplied (req.nodes == 0)
if request.nodes > len(valid_hosts):
message = f"Cannot satisfy request. Requested {request.nodes} nodes, "
message += f"but only {len(valid_hosts)} named hosts are available."
return False, message

return True, None

def _can_honor_state(
self, _request: DragonRunRequest
) -> t.Tuple[bool, t.Optional[str]]:
"""Check if the current state of the backend process inhibits executing
the request.
:param _request: the DragonRunRequest to verify
:returns: Tuple indicating if the request can be honored and
an optional error message"""
if self._shutdown_requested:
message = "Cannot satisfy request, server is shutting down."
return False, message

return True, None

def _allocate_step(
self, step_id: str, request: DragonRunRequest
) -> t.Optional[t.List[str]]:
"""Identify the hosts on which the request will be executed
:param step_id: The identifier of a step that will be executed on the host
:param request: The request to be executed
:returns: A list of selected hostnames"""
# ensure at least one host is selected
num_hosts: int = request.nodes # or 1 # todo - make at least 1 again

num_hosts: int = request.nodes
with self._queue_lock:
# # ensure positive integers in valid range are accepted
# if num_hosts <= 0 or num_hosts > len(self._hosts):
# return None

# if request.hostlist:
# tracking_info = self._prioritizer.next_n_from(
# num_hosts, [host for host in request.hostlist.split(",") if host]
# )
# else:
# tracking_info = self._prioritizer.next_n(num_hosts)

# # give back the list of node names
# to_allocate = [str(info[1]) for info in tracking_info]

# for host in to_allocate:
# self._assigned_steps[host] = step_id

# return to_allocate
if num_hosts <= 0 or num_hosts > len(self._free_hosts):
return None

to_allocate = []
for _ in range(num_hosts):
host = self._free_hosts.popleft()
self._allocated_hosts[host] = step_id
to_allocate.append(host)

return to_allocate

@staticmethod
Expand Down Expand Up @@ -641,10 +774,12 @@ def _refresh_statuses(self) -> None:
for host in group_info.hosts:
logger.debug(f"Releasing host {host}")
try:
self._allocated_hosts.pop(host)
self._allocated_hosts.pop(host) # todo: remove
self._assigned_steps.pop(host)
self._prioritizer.decrement(host)
except KeyError:
logger.error(f"Tried to free a non-allocated host: {host}")
self._free_hosts.append(host)
self._free_hosts.append(host) # todo: remove
group_info.process_group = None
group_info.redir_workers = None

Expand Down
2 changes: 2 additions & 0 deletions smartsim/_core/launcher/dragon/dragonLauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def run(self, step: Step) -> t.Optional[str]:
merged_env = self._connector.merge_persisted_env(os.environ.copy())
nodes = int(run_args.get("nodes", None) or 1)
tasks_per_node = int(run_args.get("tasks-per-node", None) or 1)
# hosts = str(run_args.get("host-list", ""))

policy = DragonRunPolicy.from_run_args(run_args)

Expand All @@ -187,6 +188,7 @@ def run(self, step: Step) -> t.Optional[str]:
output_file=out,
error_file=err,
policy=policy,
# hostlist=",".join(policy.hostlist),
)
),
DragonRunResponse,
Expand Down
Loading

0 comments on commit 81a59b0

Please sign in to comment.