Skip to content

Commit

Permalink
Enable specifying hostname(s) in DragonRunRequests
Browse files Browse the repository at this point in the history
  • Loading branch information
ankona committed Jul 31, 2024
1 parent b708555 commit f42bbfa
Show file tree
Hide file tree
Showing 11 changed files with 912 additions and 22 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/run_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ jobs:
- name: Install SmartSim (with ML backends)
run: |
python -m pip install git+https://github.com/CrayLabs/SmartRedis.git@develop#egg=smartredis
python -m pip install .[dev,ml]
python -m pip install .[dev,mypy,ml]
- name: Install ML Runtimes with Smart (with pt, tf, and onnx support)
if: (contains( matrix.os, 'ubuntu' ) || contains( matrix.os, 'macos-12')) && ( matrix.subset != 'dragon' )
Expand All @@ -127,7 +127,6 @@ jobs:

- name: Run mypy
run: |
python -m pip install .[mypy]
make check-mypy
- name: Run Pylint
Expand Down
1 change: 1 addition & 0 deletions doc/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Jump to:

Description

- Enable hostname selection for dragon tasks
- Enable dynamic feature store selection
- Adjust schemas for better performance
- Add TorchWorker first implementation and mock inference app example
Expand Down
161 changes: 148 additions & 13 deletions smartsim/_core/launcher/dragon/dragonBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import dragon.native.process_group as dragon_process_group
import dragon.native.machine as dragon_machine

from smartsim._core.launcher.dragon.pqueue import NodePrioritizer

# pylint: enable=import-error
# isort: on
from ...._core.config import get_config
Expand All @@ -69,6 +71,10 @@
logger = get_logger(__name__)


# tracking structure for [num_refs, node_name, is_dirty]
_NodeRefCount = t.List[t.Union[int, str, bool]]


class DragonStatus(str, Enum):
ERROR = str(dragon_group_state.Error())
RUNNING = str(dragon_group_state.Running())
Expand Down Expand Up @@ -190,21 +196,33 @@ def __init__(self, pid: int) -> None:
self._view = DragonBackendView(self)
logger.debug(self._view.host_desc)
self._infra_ddict: t.Optional[dragon_ddict.DDict] = None
self._prioritizer = NodePrioritizer(self._nodes, self._queue_lock)

@property
def hosts(self) -> list[str]:
with self._queue_lock:
return self._hosts

# todo: remove
@property
def free_hosts(self) -> t.Deque[str]: # todo: swap to dict[str, str]:
with self._queue_lock:
return self._free_hosts

# todo: remove
@property
def allocated_hosts(self) -> dict[str, str]:
with self._queue_lock:
return self._allocated_hosts

@property
def free_hosts(self) -> t.Deque[str]:
with self._queue_lock:
return self._free_hosts
def assigned_steps(self) -> dict[str, str]:
return self._assigned_steps

# @property
# def free_hosts(self) -> t.List[str]:
# with self._queue_lock:
# return [str(item[1]) for item in self._prioritizer.unassigned()]

@property
def group_infos(self) -> dict[str, ProcessGroupInfo]:
Expand All @@ -220,12 +238,23 @@ def _initialize_hosts(self) -> None:
self._cpus = [node.num_cpus for node in self._nodes]
self._gpus = [node.num_gpus for node in self._nodes]

"""List of hosts available in allocation"""
self._free_hosts: t.Deque[str] = collections.deque(self._hosts)
# """List of hosts available in allocation"""
self._free_hosts: t.Deque[str] = collections.deque(
self._hosts
) # todo: remove
"""List of hosts on which steps can be launched"""
self._allocated_hosts: t.Dict[str, str] = {}
# todo: replace self._allocated_hosts w/self._assigned_steps
self._assigned_steps: t.Dict[str, str] = {}
"""Mapping of hosts on which a step is already running to step ID"""

self._ref_map: t.Dict[str, _NodeRefCount] = {}
"""Map node names to a ref counter for direct access"""
self._cpu_refs: t.List[_NodeRefCount] = []
"""Track reference counts to CPU-only nodes"""
self._gpu_refs: t.List[_NodeRefCount] = []
"""Track reference counts to GPU nodes"""

def __str__(self) -> str:
return self.status_message

Expand Down Expand Up @@ -305,23 +334,18 @@ def _can_honor_policy(
# make sure some node has enough CPUs
available = max(self._cpus)
requested = max(request.policy.cpu_affinity)

if requested >= available:
return False, "Cannot satisfy request, not enough CPUs available"

if request.policy.gpu_affinity:
# make sure some node has enough GPUs
available = max(self._gpus)
requested = max(request.policy.gpu_affinity)

if requested >= available:
return False, "Cannot satisfy request, not enough GPUs available"

return True, None

def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str]]:
"""Check if request can be honored with resources available in the allocation.
Currently only checks for total number of nodes,
in the future it will also look at other constraints
such as memory, accelerators, and so on.
Expand All @@ -339,20 +363,129 @@ def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str]
return False, err

return True, None
# honorable, err = self._can_honor_hosts(request)
# if not honorable:
# return False, err

# honorable, err = self._can_honor_state(request)
# if not honorable:
# return False, err

# honorable, err = self._can_honor_affinities(request)
# if not honorable:
# return False, err

# return True, None

def _can_honor_affinities(
self, request: DragonRunRequest
) -> t.Tuple[bool, t.Optional[str]]:
"""Check if the policy can be honored with resources available
in the allocation.
:param request: the DragonRunRequest to verify
:returns: Tuple indicating if the request can be honored and
an optional error message"""
if request.policy:
if request.policy.cpu_affinity:
# make sure some node has enough CPUs
available = max(self._cpus)
requested = max(request.policy.cpu_affinity)

if requested >= available:
return False, "Cannot satisfy request, not enough CPUs available"

if request.policy.gpu_affinity:
# make sure some node has enough GPUs
available = max(self._gpus)
requested = max(request.policy.gpu_affinity)

if requested >= available:
return False, "Cannot satisfy request, not enough GPUs available"

return True, None

def _can_honor_hosts(
self, request: DragonRunRequest
) -> t.Tuple[bool, t.Optional[str]]:
"""Check if the current state of the backend process inhibits executing
the request.
:param request: the DragonRunRequest to verify
:returns: Tuple indicating if the request can be honored and
an optional error message"""
if request.nodes > len(self._hosts):
message = f"Cannot satisfy request. Requested {request.nodes} nodes, "
message += f"but only {len(self._hosts)} nodes are available."
return False, message

requested_hosts: t.Set[str] = set()
if request.hostlist:
requested_hosts = set(request.hostlist)
if not requested_hosts:
return True, None

all_hosts = set(self._hosts)
valid_hosts = all_hosts.intersection(requested_hosts)

# don't worry about count when only hostnames are supplied (req.nodes == 0)
if request.nodes > len(valid_hosts):
message = f"Cannot satisfy request. Requested {request.nodes} nodes, "
message += f"but only {len(valid_hosts)} named hosts are available."
return False, message

return True, None

def _can_honor_state(
self, _request: DragonRunRequest
) -> t.Tuple[bool, t.Optional[str]]:
"""Check if the current state of the backend process inhibits executing
the request.
:param _request: the DragonRunRequest to verify
:returns: Tuple indicating if the request can be honored and
an optional error message"""
if self._shutdown_requested:
message = "Cannot satisfy request, server is shutting down."
return False, message

return True, None

def _allocate_step(
self, step_id: str, request: DragonRunRequest
) -> t.Optional[t.List[str]]:
"""Identify the hosts on which the request will be executed
:param step_id: The identifier of a step that will be executed on the host
:param request: The request to be executed
:returns: A list of selected hostnames"""
# ensure at least one host is selected
num_hosts: int = request.nodes # or 1 # todo - make at least 1 again

num_hosts: int = request.nodes
with self._queue_lock:
# # ensure positive integers in valid range are accepted
# if num_hosts <= 0 or num_hosts > len(self._hosts):
# return None

# if request.hostlist:
# tracking_info = self._prioritizer.next_n_from(
# num_hosts, [host for host in request.hostlist.split(",") if host]
# )
# else:
# tracking_info = self._prioritizer.next_n(num_hosts)

# # give back the list of node names
# to_allocate = [str(info[1]) for info in tracking_info]

# for host in to_allocate:
# self._assigned_steps[host] = step_id

# return to_allocate
if num_hosts <= 0 or num_hosts > len(self._free_hosts):
return None

to_allocate = []
for _ in range(num_hosts):
host = self._free_hosts.popleft()
self._allocated_hosts[host] = step_id
to_allocate.append(host)

return to_allocate

@staticmethod
Expand Down Expand Up @@ -641,10 +774,12 @@ def _refresh_statuses(self) -> None:
for host in group_info.hosts:
logger.debug(f"Releasing host {host}")
try:
self._allocated_hosts.pop(host)
self._allocated_hosts.pop(host) # todo: remove
self._assigned_steps.pop(host)
self._prioritizer.decrement(host)
except KeyError:
logger.error(f"Tried to free a non-allocated host: {host}")
self._free_hosts.append(host)
self._free_hosts.append(host) # todo: remove
group_info.process_group = None
group_info.redir_workers = None

Expand Down
2 changes: 2 additions & 0 deletions smartsim/_core/launcher/dragon/dragonLauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def run(self, step: Step) -> t.Optional[str]:
merged_env = self._connector.merge_persisted_env(os.environ.copy())
nodes = int(run_args.get("nodes", None) or 1)
tasks_per_node = int(run_args.get("tasks-per-node", None) or 1)
# hosts = str(run_args.get("host-list", ""))

policy = DragonRunPolicy.from_run_args(run_args)

Expand All @@ -187,6 +188,7 @@ def run(self, step: Step) -> t.Optional[str]:
output_file=out,
error_file=err,
policy=policy,
# hostlist=",".join(policy.hostlist),
)
),
DragonRunResponse,
Expand Down
Loading

0 comments on commit f42bbfa

Please sign in to comment.