From ef034d569ef736e4ab524dc140d002057228cca1 Mon Sep 17 00:00:00 2001 From: Chris McBride <3595025+ankona@users.noreply.github.com> Date: Sun, 25 Aug 2024 23:47:01 -0400 Subject: [PATCH] Enable specification of target hostname for a dragon task (#660) ## Description This PR adds two features: 1. Ability to specify hostnames that tasks should run on 2. Enable tasks colocation ### Specifying Hostnames The existing `DragonRunRequest` supported the ability to specify a hostname when creating a policy used to run a task. However, the hostnames were not exposed to clients. This ticket allows clients to pass a list of hosts that will be used in place of the default "first available host" behavior. ### Task Colocation The prior system for finding nodes to execute a task worked worked only with unassigned nodes. Any node assigned a task could not be assigned another task. This ticket adds a more capable prioritizer class that enables clients using hostnames to colocate tasks. It also retains the capability to return open nodes when no hostname is specified. --- doc/changelog.md | 1 + .../_core/launcher/dragon/dragonBackend.py | 225 +++++-- .../_core/launcher/dragon/dragonLauncher.py | 2 + smartsim/_core/launcher/dragon/pqueue.py | 467 +++++++++++++++ smartsim/_core/launcher/step/dragonStep.py | 2 + smartsim/settings/dragonRunSettings.py | 20 + tests/test_dragon_run_request.py | 341 ++++++----- tests/test_dragon_runsettings.py | 119 ++++ tests/test_dragon_step.py | 13 + tests/test_node_prioritizer.py | 555 ++++++++++++++++++ 10 files changed, 1542 insertions(+), 203 deletions(-) create mode 100644 smartsim/_core/launcher/dragon/pqueue.py create mode 100644 tests/test_node_prioritizer.py diff --git a/doc/changelog.md b/doc/changelog.md index 9240efbc8..964e62b49 100644 --- a/doc/changelog.md +++ b/doc/changelog.md @@ -13,6 +13,7 @@ Jump to: Description +- Enable hostname selection for dragon tasks - Remove pydantic dependency from MLI code - Update MLI environment variables using new naming convention - Reduce a copy by using torch.from_numpy instead of torch.tensor diff --git a/smartsim/_core/launcher/dragon/dragonBackend.py b/smartsim/_core/launcher/dragon/dragonBackend.py index daf18e2cb..2fda87646 100644 --- a/smartsim/_core/launcher/dragon/dragonBackend.py +++ b/smartsim/_core/launcher/dragon/dragonBackend.py @@ -45,6 +45,8 @@ import dragon.native.process_group as dragon_process_group import dragon.native.machine as dragon_machine +from smartsim._core.launcher.dragon.pqueue import NodePrioritizer, PrioritizerFilter + # pylint: enable=import-error # isort: on from ...._core.config import get_config @@ -190,6 +192,18 @@ def __init__(self, pid: int) -> None: self._view = DragonBackendView(self) logger.debug(self._view.host_desc) self._infra_ddict: t.Optional[dragon_ddict.DDict] = None + self._prioritizer = NodePrioritizer(self._nodes, self._queue_lock) + + self._nodes: t.List["dragon_machine.Node"] = [] + """Node capability information for hosts in the allocation""" + self._hosts: t.List[str] = [] + """List of hosts available in allocation""" + self._cpus: t.List[int] = [] + """List of cpu-count by node""" + self._gpus: t.List[int] = [] + """List of gpu-count by node""" + self._allocated_hosts: t.Dict[str, t.Set[str]] = {} + """Mapping with hostnames as keys and a set of running step IDs as the value""" @property def hosts(self) -> list[str]: @@ -197,34 +211,39 @@ def hosts(self) -> list[str]: return self._hosts @property - def allocated_hosts(self) -> dict[str, str]: + def allocated_hosts(self) -> dict[str, t.Set[str]]: + """A map of host names to the step id executing on a host + + :returns: Dictionary with host name as key and step id as value""" with self._queue_lock: return self._allocated_hosts @property - def free_hosts(self) -> t.Deque[str]: + def free_hosts(self) -> t.Sequence[str]: + """Find hosts that do not have a step assigned + + :returns: List of host names""" with self._queue_lock: - return self._free_hosts + return list(map(lambda x: x.hostname, self._prioritizer.unassigned())) @property def group_infos(self) -> dict[str, ProcessGroupInfo]: + """Find information pertaining to process groups executing on a host + + :returns: Dictionary with host name as key and group information as value""" with self._queue_lock: return self._group_infos def _initialize_hosts(self) -> None: + """Prepare metadata about the allocation""" with self._queue_lock: self._nodes = [ dragon_machine.Node(node) for node in dragon_machine.System().nodes ] - self._hosts: t.List[str] = sorted(node.hostname for node in self._nodes) + self._hosts = sorted(node.hostname for node in self._nodes) self._cpus = [node.num_cpus for node in self._nodes] self._gpus = [node.num_gpus for node in self._nodes] - - """List of hosts available in allocation""" - self._free_hosts: t.Deque[str] = collections.deque(self._hosts) - """List of hosts on which steps can be launched""" - self._allocated_hosts: t.Dict[str, str] = {} - """Mapping of hosts on which a step is already running to step ID""" + self._allocated_hosts = collections.defaultdict(set) def __str__(self) -> str: return self.status_message @@ -233,7 +252,7 @@ def __str__(self) -> str: def status_message(self) -> str: """Message with status of available nodes and history of launched jobs. - :returns: Status message + :returns: a status message """ return ( "Dragon server backend update\n" @@ -245,9 +264,8 @@ def _heartbeat(self) -> None: @property def cooldown_period(self) -> int: - """Time (in seconds) the server will wait before shutting down - - when exit conditions are met (see ``should_shutdown()`` for further details). + """Time (in seconds) the server will wait before shutting down when + exit conditions are met (see ``should_shutdown()`` for further details). """ return self._cooldown_period @@ -281,6 +299,8 @@ def should_shutdown(self) -> bool: and it requested immediate shutdown, or if it did not request immediate shutdown, but all jobs have been executed. In both cases, a cooldown period may need to be waited before shutdown. + + :returns: `True` if the server should terminate, otherwise `False` """ if self._shutdown_requested and self._can_shutdown: return self._has_cooled_down @@ -288,7 +308,9 @@ def should_shutdown(self) -> bool: @property def current_time(self) -> float: - """Current time for DragonBackend object, in seconds since the Epoch""" + """Current time for DragonBackend object, in seconds since the Epoch + + :returns: the current timestamp""" return time.time() def _can_honor_policy( @@ -296,63 +318,149 @@ def _can_honor_policy( ) -> t.Tuple[bool, t.Optional[str]]: """Check if the policy can be honored with resources available in the allocation. - :param request: DragonRunRequest containing policy information + + :param request: `DragonRunRequest` to validate :returns: Tuple indicating if the policy can be honored and an optional error message""" # ensure the policy can be honored if request.policy: + logger.debug(f"{request.policy=}{self._cpus=}{self._gpus=}") + if request.policy.cpu_affinity: # make sure some node has enough CPUs - available = max(self._cpus) + last_available = max(self._cpus or [-1]) requested = max(request.policy.cpu_affinity) - - if requested >= available: + if not any(self._cpus) or requested >= last_available: return False, "Cannot satisfy request, not enough CPUs available" - if request.policy.gpu_affinity: # make sure some node has enough GPUs - available = max(self._gpus) + last_available = max(self._gpus or [-1]) requested = max(request.policy.gpu_affinity) - - if requested >= available: + if not any(self._gpus) or requested >= last_available: + logger.warning( + f"failed check w/{self._gpus=}, {requested=}, {last_available=}" + ) return False, "Cannot satisfy request, not enough GPUs available" - return True, None def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str]]: - """Check if request can be honored with resources available in the allocation. - - Currently only checks for total number of nodes, - in the future it will also look at other constraints - such as memory, accelerators, and so on. + """Check if request can be honored with resources available in + the allocation. Currently only checks for total number of nodes, + in the future it will also look at other constraints such as memory, + accelerators, and so on. + + :param request: `DragonRunRequest` to validate + :returns: Tuple indicating if the request can be honored and + an optional error message """ - if request.nodes > len(self._hosts): - message = f"Cannot satisfy request. Requested {request.nodes} nodes, " - message += f"but only {len(self._hosts)} nodes are available." - return False, message - if self._shutdown_requested: - message = "Cannot satisfy request, server is shutting down." - return False, message + honorable, err = self._can_honor_state(request) + if not honorable: + return False, err honorable, err = self._can_honor_policy(request) if not honorable: return False, err + honorable, err = self._can_honor_hosts(request) + if not honorable: + return False, err + + return True, None + + def _can_honor_hosts( + self, request: DragonRunRequest + ) -> t.Tuple[bool, t.Optional[str]]: + """Check if the current state of the backend process inhibits executing + the request. + + :param request: `DragonRunRequest` to validate + :returns: Tuple indicating if the request can be honored and + an optional error message""" + all_hosts = frozenset(self._hosts) + num_nodes = request.nodes + + # fail if requesting more nodes than the total number available + if num_nodes > len(all_hosts): + message = f"Cannot satisfy request. {num_nodes} requested nodes" + message += f" exceeds {len(all_hosts)} available." + return False, message + + requested_hosts = all_hosts + if request.hostlist: + requested_hosts = frozenset( + {host.strip() for host in request.hostlist.split(",")} + ) + + valid_hosts = all_hosts.intersection(requested_hosts) + invalid_hosts = requested_hosts - valid_hosts + + logger.debug(f"{num_nodes=}{valid_hosts=}{invalid_hosts=}") + + if invalid_hosts: + logger.warning(f"Some invalid hostnames were requested: {invalid_hosts}") + + # fail if requesting specific hostnames and there aren't enough available + if num_nodes > len(valid_hosts): + message = f"Cannot satisfy request. Requested {num_nodes} nodes, " + message += f"but only {len(valid_hosts)} named hosts are available." + return False, message + + return True, None + + def _can_honor_state( + self, _request: DragonRunRequest + ) -> t.Tuple[bool, t.Optional[str]]: + """Check if the current state of the backend process inhibits executing + the request. + :param _request: the DragonRunRequest to verify + :returns: Tuple indicating if the request can be honored and + an optional error message""" + if self._shutdown_requested: + message = "Cannot satisfy request, server is shutting down." + return False, message + return True, None def _allocate_step( self, step_id: str, request: DragonRunRequest ) -> t.Optional[t.List[str]]: + """Identify the hosts on which the request will be executed + :param step_id: The identifier of a step that will be executed on the host + :param request: The request to be executed + :returns: A list of selected hostnames""" + # ensure at least one host is selected num_hosts: int = request.nodes + with self._queue_lock: - if num_hosts <= 0 or num_hosts > len(self._free_hosts): + if num_hosts <= 0 or num_hosts > len(self._hosts): + logger.debug( + f"The number of requested hosts ({num_hosts}) is invalid or" + f" cannot be satisfied with {len(self._hosts)} available nodes" + ) return None - to_allocate = [] - for _ in range(num_hosts): - host = self._free_hosts.popleft() - self._allocated_hosts[host] = step_id - to_allocate.append(host) + + hosts = [] + if request.hostlist: + # convert the comma-separated argument into a real list + hosts = [host for host in request.hostlist.split(",") if host] + + filter_on: t.Optional[PrioritizerFilter] = None + if request.policy and request.policy.gpu_affinity: + filter_on = PrioritizerFilter.GPU + + nodes = self._prioritizer.next_n(num_hosts, filter_on, step_id, hosts) + + if len(nodes) < num_hosts: + # exit if the prioritizer can't identify enough nodes + return None + + to_allocate = [node.hostname for node in nodes] + + for hostname in to_allocate: + # track assigning this step to each node + self._allocated_hosts[hostname].add(step_id) + return to_allocate @staticmethod @@ -392,6 +500,7 @@ def _create_redirect_workers( return grp_redir def _stop_steps(self) -> None: + """Trigger termination of all currently executing steps""" self._heartbeat() with self._queue_lock: while len(self._stop_requests) > 0: @@ -451,6 +560,7 @@ def create_run_policy( request: DragonRequest, node_name: str ) -> "dragon_policy.Policy": """Create a dragon Policy from the request and node name + :param request: DragonRunRequest containing policy information :param node_name: Name of the node on which the process will run :returns: dragon_policy.Policy object mapped from request properties""" @@ -586,9 +696,11 @@ def _start_steps(self) -> None: logger.error(e) def _refresh_statuses(self) -> None: + """Query underlying management system for step status and update + stored assigned and unassigned task information""" self._heartbeat() with self._queue_lock: - terminated = [] + terminated: t.Set[str] = set() for step_id in self._running_steps: group_info = self._group_infos[step_id] grp = group_info.process_group @@ -622,11 +734,15 @@ def _refresh_statuses(self) -> None: ) if group_info.status in TERMINAL_STATUSES: - terminated.append(step_id) + terminated.add(step_id) if terminated: logger.debug(f"{terminated=}") + # remove all the terminated steps from all hosts + for host in list(self._allocated_hosts.keys()): + self._allocated_hosts[host].difference_update(terminated) + for step_id in terminated: self._running_steps.remove(step_id) self._completed_steps.append(step_id) @@ -634,11 +750,13 @@ def _refresh_statuses(self) -> None: if group_info is not None: for host in group_info.hosts: logger.debug(f"Releasing host {host}") - try: - self._allocated_hosts.pop(host) - except KeyError: + if host not in self._allocated_hosts: logger.error(f"Tried to free a non-allocated host: {host}") - self._free_hosts.append(host) + else: + # remove any hosts that have had all their steps terminated + if not self._allocated_hosts[host]: + self._allocated_hosts.pop(host) + self._prioritizer.decrement(host, step_id) group_info.process_group = None group_info.redir_workers = None @@ -662,6 +780,7 @@ def _should_print_status(self) -> bool: return False def _update(self) -> None: + """Trigger all update queries and update local state database""" self._stop_steps() self._start_steps() self._refresh_statuses() @@ -749,8 +868,12 @@ def _(self, request: DragonShutdownRequest) -> DragonShutdownResponse: class DragonBackendView: - def __init__(self, backend: DragonBackend): + def __init__(self, backend: DragonBackend) -> None: + """Initialize the instance + + :param backend: A dragon backend used to produce the view""" self._backend = backend + """A dragon backend used to produce the view""" @property def host_desc(self) -> str: @@ -812,9 +935,7 @@ def step_table(self) -> str: @property def host_table(self) -> str: """Table representation of current state of nodes available - - in the allocation. - """ + in the allocation.""" headers = ["Host", "Status"] hosts = self._backend.hosts free_hosts = self._backend.free_hosts diff --git a/smartsim/_core/launcher/dragon/dragonLauncher.py b/smartsim/_core/launcher/dragon/dragonLauncher.py index 9078fed54..e8391410b 100644 --- a/smartsim/_core/launcher/dragon/dragonLauncher.py +++ b/smartsim/_core/launcher/dragon/dragonLauncher.py @@ -170,6 +170,7 @@ def run(self, step: Step) -> t.Optional[str]: merged_env = self._connector.merge_persisted_env(os.environ.copy()) nodes = int(run_args.get("nodes", None) or 1) tasks_per_node = int(run_args.get("tasks-per-node", None) or 1) + hosts = str(run_args.get("host-list", "")) policy = DragonRunPolicy.from_run_args(run_args) @@ -187,6 +188,7 @@ def run(self, step: Step) -> t.Optional[str]: output_file=out, error_file=err, policy=policy, + hostlist=hosts, ) ), DragonRunResponse, diff --git a/smartsim/_core/launcher/dragon/pqueue.py b/smartsim/_core/launcher/dragon/pqueue.py new file mode 100644 index 000000000..a9faf76b1 --- /dev/null +++ b/smartsim/_core/launcher/dragon/pqueue.py @@ -0,0 +1,467 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# import collections +import enum +import heapq +import threading +import typing as t + +from smartsim.error.errors import SmartSimError +from smartsim.log import get_logger + +logger = get_logger(__name__) + + +class Node(t.Protocol): + """Base Node API required to support the NodePrioritizer""" + + @property + def hostname(self) -> str: + """The hostname of the node""" + + @property + def num_cpus(self) -> int: + """The number of CPUs in the node""" + + @property + def num_gpus(self) -> int: + """The number of GPUs in the node""" + + +class NodeReferenceCount(t.Protocol): + """Contains details pertaining to references to a node""" + + @property + def hostname(self) -> str: + """The hostname of the node""" + + @property + def num_refs(self) -> int: + """The number of jobs assigned to the node""" + + +class _TrackedNode: + """Node API required to have support in the NodePrioritizer""" + + def __init__(self, node: Node) -> None: + self._node = node + """The node being tracked""" + self._num_refs = 0 + """The number of references to the tracked node""" + self._assigned_tasks: t.Set[str] = set() + """The unique identifiers of processes using this node""" + self._is_dirty = False + """Flag indicating that tracking information has been modified""" + + @property + def hostname(self) -> str: + """Returns the hostname of the node""" + return self._node.hostname + + @property + def num_cpus(self) -> int: + """Returns the number of CPUs in the node""" + return self._node.num_cpus + + @property + def num_gpus(self) -> int: + """Returns the number of GPUs attached to the node""" + return self._node.num_gpus + + @property + def num_refs(self) -> int: + """Returns the number of processes currently running on the node""" + return self._num_refs + + @property + def is_assigned(self) -> bool: + """Returns `True` if no references are currently counted, `False` otherwise""" + return self._num_refs > 0 + + @property + def assigned_tasks(self) -> t.Set[str]: + """Returns the set of unique IDs for currently running processes""" + return self._assigned_tasks + + @property + def is_dirty(self) -> bool: + """Returns a flag indicating if the reference counter has changed. `True` + if references have been added or removed, `False` otherwise.""" + return self._is_dirty + + def clean(self) -> None: + """Marks the node as unmodified""" + self._is_dirty = False + + def add( + self, + tracking_id: t.Optional[str] = None, + ) -> None: + """Update the node to indicate the addition of a process that must be + reference counted. + + :param tracking_id: a unique task identifier executing on the node + to add + :raises ValueError: if tracking_id is already assigned to this node""" + if tracking_id in self.assigned_tasks: + raise ValueError("Attempted adding task more than once") + + self._num_refs = self._num_refs + 1 + if tracking_id: + self._assigned_tasks = self._assigned_tasks.union({tracking_id}) + self._is_dirty = True + + def remove( + self, + tracking_id: t.Optional[str] = None, + ) -> None: + """Update the reference counter to indicate the removal of a process. + + :param tracking_id: a unique task identifier executing on the node + to remove + :raises ValueError: if tracking_id is already assigned to this node""" + if tracking_id and tracking_id not in self.assigned_tasks: + raise ValueError("Attempted removal of untracked item") + + self._num_refs = max(self._num_refs - 1, 0) + if tracking_id: + self._assigned_tasks = self._assigned_tasks - {tracking_id} + self._is_dirty = True + + def __lt__(self, other: "_TrackedNode") -> bool: + """Comparison operator used to evaluate the ordering of nodes within + the prioritizer. This comparison only considers reference counts. + + :param other: Another node to compare against + :returns: True if this node has fewer references than the other node""" + if self.num_refs < other.num_refs: + return True + + return False + + +class PrioritizerFilter(str, enum.Enum): + """A filter used to select a subset of nodes to be queried""" + + CPU = enum.auto() + GPU = enum.auto() + + +class NodePrioritizer: + def __init__(self, nodes: t.List[Node], lock: threading.RLock) -> None: + """Initialize the prioritizer + + :param nodes: node attribute information for initializing the priorizer + :param lock: a lock used to ensure threadsafe operations + :raises SmartSimError: if the nodes collection is empty + """ + if not nodes: + raise SmartSimError("Missing nodes to prioritize") + + self._lock = lock + """Lock used to ensure thread safe changes of the reference counters""" + self._cpu_refs: t.List[_TrackedNode] = [] + """Track reference counts to CPU-only nodes""" + self._gpu_refs: t.List[_TrackedNode] = [] + """Track reference counts to GPU nodes""" + self._nodes: t.Dict[str, _TrackedNode] = {} + + self._initialize_reference_counters(nodes) + + def _initialize_reference_counters(self, nodes: t.List[Node]) -> None: + """Perform initialization of reference counters for nodes in the allocation + + :param nodes: node attribute information for initializing the priorizer""" + for node in nodes: + # create a set of reference counters for the nodes + tracked = _TrackedNode(node) + + self._nodes[node.hostname] = tracked # for O(1) access + + if node.num_gpus: + self._gpu_refs.append(tracked) + else: + self._cpu_refs.append(tracked) + + def increment( + self, host: str, tracking_id: t.Optional[str] = None + ) -> NodeReferenceCount: + """Directly increment the reference count of a given node and ensure the + ref counter is marked as dirty to trigger a reordering on retrieval + + :param host: a hostname that should have a reference counter selected + :param tracking_id: a unique task identifier executing on the node + to add""" + with self._lock: + tracked_node = self._nodes[host] + tracked_node.add(tracking_id) + return tracked_node + + def _heapify_all_refs(self) -> t.List[_TrackedNode]: + """Combine the CPU and GPU nodes into a single heap + + :returns: list of all reference counters""" + refs = [*self._cpu_refs, *self._gpu_refs] + heapq.heapify(refs) + return refs + + def get_tracking_info(self, host: str) -> NodeReferenceCount: + """Returns the reference counter information for a single node + + :param host: a hostname that should have a reference counter selected + :returns: a reference counter for the node + :raises ValueError: if the hostname is not in the set of managed nodes""" + if host not in self._nodes: + raise ValueError("The supplied hostname was not found") + + return self._nodes[host] + + def decrement( + self, host: str, tracking_id: t.Optional[str] = None + ) -> NodeReferenceCount: + """Directly decrement the reference count of a given node and ensure the + ref counter is marked as dirty to trigger a reordering + + :param host: a hostname that should have a reference counter decremented + :param tracking_id: unique task identifier to remove""" + with self._lock: + tracked_node = self._nodes[host] + tracked_node.remove(tracking_id) + + return tracked_node + + def _create_sub_heap( + self, + hosts: t.Optional[t.List[str]] = None, + filter_on: t.Optional[PrioritizerFilter] = None, + ) -> t.List[_TrackedNode]: + """Create a new heap from the primary heap with user-specified nodes + + :param hosts: a list of hostnames used to filter the available nodes + :returns: a list of assigned reference counters + """ + nodes_tracking_info: t.List[_TrackedNode] = [] + heap = self._get_filtered_heap(filter_on) + + # Collect all the tracking info for the requested nodes... + for node in heap: + if not hosts or node.hostname in hosts: + nodes_tracking_info.append(node) + + # ... and use it to create a new heap from a specified subset of nodes + heapq.heapify(nodes_tracking_info) + + return nodes_tracking_info + + def unassigned( + self, heap: t.Optional[t.List[_TrackedNode]] = None + ) -> t.Sequence[Node]: + """Select nodes that are currently not assigned a task + + :param heap: a subset of the node heap to consider + :returns: a list of reference counts for all unassigned nodes""" + if heap is None: + heap = list(self._nodes.values()) + + nodes: t.List[_TrackedNode] = [] + for item in heap: + if item.num_refs == 0: + nodes.append(item) + return nodes + + def assigned( + self, heap: t.Optional[t.List[_TrackedNode]] = None + ) -> t.Sequence[Node]: + """Helper method to identify the nodes that are currently assigned + + :param heap: a subset of the node heap to consider + :returns: a list of reference counts for all assigned nodes""" + if heap is None: + heap = list(self._nodes.values()) + + nodes: t.List[_TrackedNode] = [] + for item in heap: + if item.num_refs > 0: + nodes.append(item) + return nodes + + def _check_satisfiable_n( + self, num_items: int, heap: t.Optional[t.List[_TrackedNode]] = None + ) -> bool: + """Validates that a request for some number of nodes `n` can be + satisfied by the prioritizer given the set of nodes available + + :param num_items: the desired number of nodes to allocate + :param heap: a subset of the node heap to consider + :returns: True if the request can be fulfilled, False otherwise""" + num_nodes = len(self._nodes.keys()) + + if num_items < 1: + msg = "Cannot handle request; request requires a positive integer" + logger.warning(msg) + return False + + if num_nodes < num_items: + msg = f"Cannot satisfy request for {num_items} nodes; {num_nodes} in pool" + logger.warning(msg) + return False + + num_open = len(self.unassigned(heap)) + if num_open < num_items: + msg = f"Cannot satisfy request for {num_items} nodes; {num_open} available" + logger.warning(msg) + return False + + return True + + def _get_next_unassigned_node( + self, + heap: t.List[_TrackedNode], + tracking_id: t.Optional[str] = None, + ) -> t.Optional[Node]: + """Finds the next node with no running processes and + ensures that any elements that were directly updated are updated in + the priority structure before being made available + + :param heap: a subset of the node heap to consider + :param tracking_id: unique task identifier to track + :returns: a reference counter for an available node if an unassigned node + exists, `None` otherwise""" + tracking_info: t.Optional[_TrackedNode] = None + + with self._lock: + # re-sort the heap to handle any tracking changes + if any(node.is_dirty for node in heap): + heapq.heapify(heap) + + # grab the min node from the heap + tracking_info = heapq.heappop(heap) + + # the node is available if it has no assigned tasks + is_assigned = tracking_info.is_assigned + if not is_assigned: + # track the new process on the node + tracking_info.add(tracking_id) + + # add the node that was popped back into the heap + heapq.heappush(heap, tracking_info) + + # mark all nodes as clean now that everything is updated & sorted + for node in heap: + node.clean() + + # next available must only return previously unassigned nodes + if is_assigned: + return None + + return tracking_info + + def _get_next_n_available_nodes( + self, + num_items: int, + heap: t.List[_TrackedNode], + tracking_id: t.Optional[str] = None, + ) -> t.List[Node]: + """Find the next N available nodes w/least amount of references using + the supplied filter to target a specific node capability + + :param num_items: number of nodes to reserve + :param heap: a subset of the node heap to consider + :param tracking_id: unique task identifier to track + :returns: a list of reference counters for a available nodes if enough + unassigned nodes exists, `None` otherwise + :raises ValueError: if the number of requested nodes is not a positive integer + """ + next_nodes: t.List[Node] = [] + + if num_items < 1: + raise ValueError(f"Number of items requested {num_items} is invalid") + + if not self._check_satisfiable_n(num_items, heap): + return next_nodes + + while len(next_nodes) < num_items: + if next_node := self._get_next_unassigned_node(heap, tracking_id): + next_nodes.append(next_node) + continue + break + + return next_nodes + + def _get_filtered_heap( + self, filter_on: t.Optional[PrioritizerFilter] = None + ) -> t.List[_TrackedNode]: + """Helper method to select the set of nodes to include in a filtered + heap. + + :param filter_on: A list of nodes that satisfy the filter. If no + filter is supplied, all nodes are returned""" + if filter_on == PrioritizerFilter.GPU: + return self._gpu_refs + if filter_on == PrioritizerFilter.CPU: + return self._cpu_refs + + return self._heapify_all_refs() + + def next( + self, + filter_on: t.Optional[PrioritizerFilter] = None, + tracking_id: t.Optional[str] = None, + hosts: t.Optional[t.List[str]] = None, + ) -> t.Optional[Node]: + """Find the next unsassigned node using the supplied filter to target + a specific node capability + + :param filter_on: the subset of nodes to query for available nodes + :param tracking_id: unique task identifier to track + :param hosts: a list of hostnames used to filter the available nodes + :returns: a reference counter for an available node if an unassigned node + exists, `None` otherwise""" + if results := self.next_n(1, filter_on, tracking_id, hosts): + return results[0] + return None + + def next_n( + self, + num_items: int = 1, + filter_on: t.Optional[PrioritizerFilter] = None, + tracking_id: t.Optional[str] = None, + hosts: t.Optional[t.List[str]] = None, + ) -> t.List[Node]: + """Find the next N available nodes w/least amount of references using + the supplied filter to target a specific node capability + + :param num_items: number of nodes to reserve + :param filter_on: the subset of nodes to query for available nodes + :param tracking_id: unique task identifier to track + :param hosts: a list of hostnames used to filter the available nodes + :returns: Collection of reserved nodes + :raises ValueError: if the hosts parameter is an empty list""" + if hosts is not None and not hosts: + raise ValueError("No hostnames provided") + + heap = self._create_sub_heap(hosts, filter_on) + return self._get_next_n_available_nodes(num_items, heap, tracking_id) diff --git a/smartsim/_core/launcher/step/dragonStep.py b/smartsim/_core/launcher/step/dragonStep.py index dd93d7910..21fdc697c 100644 --- a/smartsim/_core/launcher/step/dragonStep.py +++ b/smartsim/_core/launcher/step/dragonStep.py @@ -169,6 +169,7 @@ def _write_request_file(self) -> str: env = run_settings.env_vars nodes = int(run_args.get("nodes", None) or 1) tasks_per_node = int(run_args.get("tasks-per-node", None) or 1) + hosts_csv = str(run_args.get("host-list", "")) policy = DragonRunPolicy.from_run_args(run_args) @@ -187,6 +188,7 @@ def _write_request_file(self) -> str: output_file=out, error_file=err, policy=policy, + hostlist=hosts_csv, ) requests.append(request_registry.to_string(request)) with open(request_file, "w", encoding="utf-8") as script_file: diff --git a/smartsim/settings/dragonRunSettings.py b/smartsim/settings/dragonRunSettings.py index 69a91547e..15e585544 100644 --- a/smartsim/settings/dragonRunSettings.py +++ b/smartsim/settings/dragonRunSettings.py @@ -95,6 +95,26 @@ def set_node_feature(self, feature_list: t.Union[str, t.List[str]]) -> None: self.run_args["node-feature"] = ",".join(feature_list) + @override + def set_hostlist(self, host_list: t.Union[str, t.List[str]]) -> None: + """Specify the hostlist for this job + + :param host_list: hosts to launch on + :raises ValueError: if an empty host list is supplied + """ + if not host_list: + raise ValueError("empty hostlist provided") + + if isinstance(host_list, str): + host_list = host_list.replace(" ", "").split(",") + + # strip out all whitespace-only values + cleaned_list = [host.strip() for host in host_list if host and host.strip()] + if not len(cleaned_list) == len(host_list): + raise ValueError(f"invalid names found in hostlist: {host_list}") + + self.run_args["host-list"] = ",".join(cleaned_list) + def set_cpu_affinity(self, devices: t.List[int]) -> None: """Set the CPU affinity for this job diff --git a/tests/test_dragon_run_request.py b/tests/test_dragon_run_request.py index 94c17c222..5ff95f408 100644 --- a/tests/test_dragon_run_request.py +++ b/tests/test_dragon_run_request.py @@ -30,18 +30,14 @@ import time from unittest.mock import MagicMock +import pydantic.error_wrappers import pytest -from pydantic import ValidationError + +from smartsim._core.launcher.dragon.pqueue import NodePrioritizer # The tests in this file belong to the group_b group pytestmark = pytest.mark.group_b - -try: - import dragon - - dragon_loaded = True -except: - dragon_loaded = False +dragon = pytest.importorskip("dragon") from smartsim._core.config import CONFIG from smartsim._core.schemas.dragonRequests import * @@ -56,38 +52,6 @@ ) -class NodeMock(MagicMock): - def __init__( - self, name: t.Optional[str] = None, num_gpus: int = 2, num_cpus: int = 8 - ) -> None: - super().__init__() - self._mock_id = name - NodeMock._num_gpus = num_gpus - NodeMock._num_cpus = num_cpus - - @property - def hostname(self) -> str: - if self._mock_id: - return self._mock_id - return create_short_id_str() - - @property - def num_cpus(self) -> str: - return NodeMock._num_cpus - - @property - def num_gpus(self) -> str: - return NodeMock._num_gpus - - def _set_id(self, value: str) -> None: - self._mock_id = value - - def gpus(self, parent: t.Any = None) -> t.List[str]: - if self._num_gpus: - return [f"{self.hostname}-gpu{i}" for i in range(NodeMock._num_gpus)] - return [] - - class GroupStateMock(MagicMock): def Running(self) -> MagicMock: running = MagicMock(**{"__str__.return_value": "Running"}) @@ -102,69 +66,59 @@ class ProcessGroupMock(MagicMock): puids = [121, 122] -def node_mock() -> NodeMock: - return NodeMock() - - def get_mock_backend( - monkeypatch: pytest.MonkeyPatch, num_gpus: int = 2 + monkeypatch: pytest.MonkeyPatch, num_cpus: int, num_gpus: int ) -> "DragonBackend": - + # create all the necessary namespaces as raw magic mocks + monkeypatch.setitem(sys.modules, "dragon.data.ddict.ddict", MagicMock()) + monkeypatch.setitem(sys.modules, "dragon.native.machine", MagicMock()) + monkeypatch.setitem(sys.modules, "dragon.native.group_state", MagicMock()) + monkeypatch.setitem(sys.modules, "dragon.native.process_group", MagicMock()) + monkeypatch.setitem(sys.modules, "dragon.native.process", MagicMock()) + monkeypatch.setitem(sys.modules, "dragon.infrastructure.connection", MagicMock()) + monkeypatch.setitem(sys.modules, "dragon.infrastructure.policy", MagicMock()) + monkeypatch.setitem(sys.modules, "dragon.infrastructure.process_desc", MagicMock()) + monkeypatch.setitem(sys.modules, "dragon.data.ddict.ddict", MagicMock()) + + node_list = ["node1", "node2", "node3"] + system_mock = MagicMock(return_value=MagicMock(nodes=node_list)) + node_mock = lambda x: MagicMock(hostname=x, num_cpus=num_cpus, num_gpus=num_gpus) + process_group_mock = MagicMock(return_value=ProcessGroupMock()) process_mock = MagicMock(returncode=0) - process_group_mock = MagicMock(**{"Process.return_value": ProcessGroupMock()}) - process_module_mock = MagicMock() - process_module_mock.Process = process_mock - node_mock = NodeMock(num_gpus=num_gpus) - system_mock = MagicMock(nodes=["node1", "node2", "node3"]) + policy_mock = MagicMock(return_value=MagicMock()) + group_state_mock = GroupStateMock() + + # customize members that must perform specific actions within the namespaces monkeypatch.setitem( sys.modules, "dragon", MagicMock( **{ - "native.machine.Node.return_value": node_mock, - "native.machine.System.return_value": system_mock, - "native.group_state": GroupStateMock(), - "native.process_group.ProcessGroup.return_value": ProcessGroupMock(), + "native.machine.Node": node_mock, + "native.machine.System": system_mock, + "native.group_state": group_state_mock, + "native.process_group.ProcessGroup": process_group_mock, + "native.process_group.Process": process_mock, + "native.process.Process": process_mock, + "infrastructure.policy.Policy": policy_mock, } ), ) - monkeypatch.setitem( - sys.modules, - "dragon.infrastructure.connection", - MagicMock(), - ) - monkeypatch.setitem( - sys.modules, - "dragon.infrastructure.process_desc", - MagicMock(), - ) - monkeypatch.setitem( - sys.modules, - "dragon.data.ddict.ddict", - MagicMock(), - ) - monkeypatch.setitem( - sys.modules, - "dragon.infrastructure.policy", - MagicMock(**{"Policy.return_value": MagicMock()}), - ) - monkeypatch.setitem(sys.modules, "dragon.native.process", process_module_mock) - monkeypatch.setitem(sys.modules, "dragon.native.process_group", process_group_mock) - monkeypatch.setitem(sys.modules, "dragon.native.group_state", GroupStateMock()) - monkeypatch.setitem( - sys.modules, - "dragon.native.machine", - MagicMock( - **{"System.return_value": system_mock, "Node.return_value": node_mock} - ), - ) from smartsim._core.launcher.dragon.dragonBackend import DragonBackend dragon_backend = DragonBackend(pid=99999) - monkeypatch.setattr( - dragon_backend, "_free_hosts", collections.deque(dragon_backend._hosts) + + # NOTE: we're manually updating these values due to issue w/mocking namespaces + dragon_backend._prioritizer = NodePrioritizer( + [ + MagicMock(num_cpus=num_cpus, num_gpus=num_gpus, hostname=node) + for node in node_list + ], + dragon_backend._queue_lock, ) + dragon_backend._cpus = [num_cpus] * len(node_list) + dragon_backend._gpus = [num_gpus] * len(node_list) return dragon_backend @@ -222,16 +176,14 @@ def set_mock_group_infos( } monkeypatch.setattr(dragon_backend, "_group_infos", group_infos) - monkeypatch.setattr(dragon_backend, "_free_hosts", collections.deque(hosts[1:3])) monkeypatch.setattr(dragon_backend, "_allocated_hosts", {hosts[0]: "abc123-1"}) monkeypatch.setattr(dragon_backend, "_running_steps", ["abc123-1"]) return group_infos -@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") def test_handshake_request(monkeypatch: pytest.MonkeyPatch) -> None: - dragon_backend = get_mock_backend(monkeypatch) + dragon_backend = get_mock_backend(monkeypatch, num_cpus=8, num_gpus=0) handshake_req = DragonHandshakeRequest() handshake_resp = dragon_backend.process_request(handshake_req) @@ -240,9 +192,8 @@ def test_handshake_request(monkeypatch: pytest.MonkeyPatch) -> None: assert handshake_resp.dragon_pid == 99999 -@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None: - dragon_backend = get_mock_backend(monkeypatch) + dragon_backend = get_mock_backend(monkeypatch, num_cpus=8, num_gpus=0) run_req = DragonRunRequest( exe="sleep", exe_args=["5"], @@ -269,7 +220,7 @@ def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None: assert dragon_backend._running_steps == [step_id] assert len(dragon_backend._queued_steps) == 0 - assert len(dragon_backend._free_hosts) == 1 + assert len(dragon_backend.free_hosts) == 1 assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id @@ -281,7 +232,7 @@ def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None: assert dragon_backend._running_steps == [step_id] assert len(dragon_backend._queued_steps) == 0 - assert len(dragon_backend._free_hosts) == 1 + assert len(dragon_backend.free_hosts) == 1 assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id @@ -291,9 +242,8 @@ def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None: assert not dragon_backend._running_steps -@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") def test_deny_run_request(monkeypatch: pytest.MonkeyPatch) -> None: - dragon_backend = get_mock_backend(monkeypatch) + dragon_backend = get_mock_backend(monkeypatch, num_cpus=8, num_gpus=0) dragon_backend._shutdown_requested = True @@ -319,7 +269,7 @@ def test_deny_run_request(monkeypatch: pytest.MonkeyPatch) -> None: def test_run_request_with_empty_policy(monkeypatch: pytest.MonkeyPatch) -> None: """Verify that a policy is applied to a run request""" - dragon_backend = get_mock_backend(monkeypatch) + dragon_backend = get_mock_backend(monkeypatch, num_cpus=8, num_gpus=0) run_req = DragonRunRequest( exe="sleep", exe_args=["5"], @@ -335,10 +285,9 @@ def test_run_request_with_empty_policy(monkeypatch: pytest.MonkeyPatch) -> None: assert run_req.policy is None -@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") def test_run_request_with_policy(monkeypatch: pytest.MonkeyPatch) -> None: """Verify that a policy is applied to a run request""" - dragon_backend = get_mock_backend(monkeypatch) + dragon_backend = get_mock_backend(monkeypatch, num_cpus=8, num_gpus=0) run_req = DragonRunRequest( exe="sleep", exe_args=["5"], @@ -366,7 +315,7 @@ def test_run_request_with_policy(monkeypatch: pytest.MonkeyPatch) -> None: assert dragon_backend._running_steps == [step_id] assert len(dragon_backend._queued_steps) == 0 - assert len(dragon_backend._free_hosts) == 1 + assert len(dragon_backend._prioritizer.unassigned()) == 1 assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id @@ -378,7 +327,7 @@ def test_run_request_with_policy(monkeypatch: pytest.MonkeyPatch) -> None: assert dragon_backend._running_steps == [step_id] assert len(dragon_backend._queued_steps) == 0 - assert len(dragon_backend._free_hosts) == 1 + assert len(dragon_backend._prioritizer.unassigned()) == 1 assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id @@ -388,9 +337,8 @@ def test_run_request_with_policy(monkeypatch: pytest.MonkeyPatch) -> None: assert not dragon_backend._running_steps -@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") def test_udpate_status_request(monkeypatch: pytest.MonkeyPatch) -> None: - dragon_backend = get_mock_backend(monkeypatch) + dragon_backend = get_mock_backend(monkeypatch, num_cpus=8, num_gpus=0) group_infos = set_mock_group_infos(monkeypatch, dragon_backend) @@ -405,9 +353,8 @@ def test_udpate_status_request(monkeypatch: pytest.MonkeyPatch) -> None: } -@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") def test_stop_request(monkeypatch: pytest.MonkeyPatch) -> None: - dragon_backend = get_mock_backend(monkeypatch) + dragon_backend = get_mock_backend(monkeypatch, num_cpus=8, num_gpus=0) group_infos = set_mock_group_infos(monkeypatch, dragon_backend) running_steps = [ @@ -434,10 +381,9 @@ def test_stop_request(monkeypatch: pytest.MonkeyPatch) -> None: ) assert len(dragon_backend._allocated_hosts) == 0 - assert len(dragon_backend._free_hosts) == 3 + assert len(dragon_backend._prioritizer.unassigned()) == 3 -@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") @pytest.mark.parametrize( "immediate, kill_jobs, frontend_shutdown", [ @@ -456,7 +402,7 @@ def test_shutdown_request( frontend_shutdown: bool, ) -> None: monkeypatch.setenv("SMARTSIM_FLAG_TELEMETRY", "0") - dragon_backend = get_mock_backend(monkeypatch) + dragon_backend = get_mock_backend(monkeypatch, num_cpus=8, num_gpus=0) monkeypatch.setattr(dragon_backend, "_cooldown_period", 1) set_mock_group_infos(monkeypatch, dragon_backend) @@ -496,11 +442,10 @@ def test_shutdown_request( assert dragon_backend._has_cooled_down == kill_jobs -@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") @pytest.mark.parametrize("telemetry_flag", ["0", "1"]) def test_cooldown_is_set(monkeypatch: pytest.MonkeyPatch, telemetry_flag: str) -> None: monkeypatch.setenv("SMARTSIM_FLAG_TELEMETRY", telemetry_flag) - dragon_backend = get_mock_backend(monkeypatch) + dragon_backend = get_mock_backend(monkeypatch, num_cpus=8, num_gpus=0) expected_cooldown = ( 2 * CONFIG.telemetry_frequency + 5 if int(telemetry_flag) > 0 else 5 @@ -512,19 +457,17 @@ def test_cooldown_is_set(monkeypatch: pytest.MonkeyPatch, telemetry_flag: str) - assert dragon_backend.cooldown_period == expected_cooldown -@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") def test_heartbeat_and_time(monkeypatch: pytest.MonkeyPatch) -> None: - dragon_backend = get_mock_backend(monkeypatch) + dragon_backend = get_mock_backend(monkeypatch, num_cpus=8, num_gpus=0) first_heartbeat = dragon_backend.last_heartbeat assert dragon_backend.current_time > first_heartbeat dragon_backend._heartbeat() assert dragon_backend.last_heartbeat > first_heartbeat -@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") @pytest.mark.parametrize("num_nodes", [1, 3, 100]) def test_can_honor(monkeypatch: pytest.MonkeyPatch, num_nodes: int) -> None: - dragon_backend = get_mock_backend(monkeypatch) + dragon_backend = get_mock_backend(monkeypatch, num_cpus=8, num_gpus=0) run_req = DragonRunRequest( exe="sleep", exe_args=["5"], @@ -537,18 +480,42 @@ def test_can_honor(monkeypatch: pytest.MonkeyPatch, num_nodes: int) -> None: pmi_enabled=False, ) - assert dragon_backend._can_honor(run_req)[0] == ( - num_nodes <= len(dragon_backend._hosts) - ) + can_honor, error_msg = dragon_backend._can_honor(run_req) + + nodes_in_range = num_nodes <= len(dragon_backend._hosts) + assert can_honor == nodes_in_range + assert error_msg is None if nodes_in_range else error_msg is not None + + +@pytest.mark.parametrize("num_nodes", [-10, -1, 0]) +def test_can_honor_invalid_num_nodes( + monkeypatch: pytest.MonkeyPatch, num_nodes: int +) -> None: + """Verify that requests for invalid numbers of nodes (negative, zero) are rejected""" + dragon_backend = get_mock_backend(monkeypatch, num_cpus=8, num_gpus=0) + + with pytest.raises(pydantic.error_wrappers.ValidationError) as ex: + DragonRunRequest( + exe="sleep", + exe_args=["5"], + path="/a/fake/path", + nodes=num_nodes, + tasks=1, + tasks_per_node=1, + env={}, + current_env={}, + pmi_enabled=False, + ) -@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") @pytest.mark.parametrize("affinity", [[0], [0, 1], list(range(8))]) def test_can_honor_cpu_affinity( monkeypatch: pytest.MonkeyPatch, affinity: t.List[int] ) -> None: """Verify that valid CPU affinities are accepted""" - dragon_backend = get_mock_backend(monkeypatch) + num_cpus, num_gpus = 8, 0 + dragon_backend = get_mock_backend(monkeypatch, num_cpus=num_cpus, num_gpus=num_gpus) + run_req = DragonRunRequest( exe="sleep", exe_args=["5"], @@ -565,11 +532,10 @@ def test_can_honor_cpu_affinity( assert dragon_backend._can_honor(run_req)[0] -@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") def test_can_honor_cpu_affinity_out_of_range(monkeypatch: pytest.MonkeyPatch) -> None: """Verify that invalid CPU affinities are NOT accepted NOTE: negative values are captured by the Pydantic schema""" - dragon_backend = get_mock_backend(monkeypatch) + dragon_backend = get_mock_backend(monkeypatch, num_cpus=8, num_gpus=0) run_req = DragonRunRequest( exe="sleep", exe_args=["5"], @@ -586,13 +552,15 @@ def test_can_honor_cpu_affinity_out_of_range(monkeypatch: pytest.MonkeyPatch) -> assert not dragon_backend._can_honor(run_req)[0] -@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") @pytest.mark.parametrize("affinity", [[0], [0, 1]]) def test_can_honor_gpu_affinity( monkeypatch: pytest.MonkeyPatch, affinity: t.List[int] ) -> None: """Verify that valid GPU affinities are accepted""" - dragon_backend = get_mock_backend(monkeypatch) + + num_cpus, num_gpus = 8, 2 + dragon_backend = get_mock_backend(monkeypatch, num_cpus=num_cpus, num_gpus=num_gpus) + run_req = DragonRunRequest( exe="sleep", exe_args=["5"], @@ -609,11 +577,10 @@ def test_can_honor_gpu_affinity( assert dragon_backend._can_honor(run_req)[0] -@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") def test_can_honor_gpu_affinity_out_of_range(monkeypatch: pytest.MonkeyPatch) -> None: """Verify that invalid GPU affinities are NOT accepted NOTE: negative values are captured by the Pydantic schema""" - dragon_backend = get_mock_backend(monkeypatch) + dragon_backend = get_mock_backend(monkeypatch, num_cpus=8, num_gpus=0) run_req = DragonRunRequest( exe="sleep", exe_args=["5"], @@ -630,46 +597,45 @@ def test_can_honor_gpu_affinity_out_of_range(monkeypatch: pytest.MonkeyPatch) -> assert not dragon_backend._can_honor(run_req)[0] -@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") def test_can_honor_gpu_device_not_available(monkeypatch: pytest.MonkeyPatch) -> None: """Verify that a request for a GPU if none exists is not accepted""" # create a mock node class that always reports no GPUs available - dragon_backend = get_mock_backend(monkeypatch, num_gpus=0) - - run_req = DragonRunRequest( - exe="sleep", - exe_args=["5"], - path="/a/fake/path", - nodes=2, - tasks=1, - tasks_per_node=1, - env={}, - current_env={}, - pmi_enabled=False, - # specify GPU device w/no affinity - policy=DragonRunPolicy(gpu_affinity=[0]), - ) - - assert not dragon_backend._can_honor(run_req)[0] + with monkeypatch.context() as ctx: + dragon_backend = get_mock_backend(ctx, num_cpus=8, num_gpus=0) + + run_req = DragonRunRequest( + exe="sleep", + exe_args=["5"], + path="/a/fake/path", + nodes=2, + tasks=1, + tasks_per_node=1, + env={}, + current_env={}, + pmi_enabled=False, + # specify GPU device w/no affinity + policy=DragonRunPolicy(gpu_affinity=[0]), + ) + can_honor, _ = dragon_backend._can_honor(run_req) + assert not can_honor -@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") def test_get_id(monkeypatch: pytest.MonkeyPatch) -> None: - dragon_backend = get_mock_backend(monkeypatch) + dragon_backend = get_mock_backend(monkeypatch, num_cpus=8, num_gpus=0) step_id = next(dragon_backend._step_ids) assert step_id.endswith("0") assert step_id != next(dragon_backend._step_ids) -@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") def test_view(monkeypatch: pytest.MonkeyPatch) -> None: - dragon_backend = get_mock_backend(monkeypatch) + dragon_backend = get_mock_backend(monkeypatch, num_cpus=8, num_gpus=0) set_mock_group_infos(monkeypatch, dragon_backend) hosts = dragon_backend.hosts + dragon_backend._prioritizer.increment(hosts[0]) - expected_message = textwrap.dedent(f"""\ + expected_msg = textwrap.dedent(f"""\ Dragon server backend update | Host | Status | |--------|----------| @@ -677,7 +643,7 @@ def test_view(monkeypatch: pytest.MonkeyPatch) -> None: | {hosts[1]} | Free | | {hosts[2]} | Free | | Step | Status | Hosts | Return codes | Num procs | - |----------|--------------|-------------|----------------|-------------| + |----------|--------------|-----------------|----------------|-------------| | abc123-1 | Running | {hosts[0]} | | 1 | | del999-2 | Cancelled | {hosts[1]} | -9 | 1 | | c101vz-3 | Completed | {hosts[1]},{hosts[2]} | 0 | 2 | @@ -686,6 +652,79 @@ def test_view(monkeypatch: pytest.MonkeyPatch) -> None: # get rid of white space to make the comparison easier actual_msg = dragon_backend.status_message.replace(" ", "") - expected_message = expected_message.replace(" ", "") + expected_msg = expected_msg.replace(" ", "") + + # ignore dashes in separators (hostname changes may cause column expansion) + while actual_msg.find("--") > -1: + actual_msg = actual_msg.replace("--", "-") + while expected_msg.find("--") > -1: + expected_msg = expected_msg.replace("--", "-") + + assert actual_msg == expected_msg + + +def test_can_honor_hosts_unavailable_hosts(monkeypatch: pytest.MonkeyPatch) -> None: + """Verify that requesting nodes with invalid names causes number of available + nodes check to fail due to valid # of named nodes being under num_nodes""" + dragon_backend = get_mock_backend(monkeypatch, num_cpus=8, num_gpus=0) + + # let's supply 2 invalid and 1 valid hostname + actual_hosts = list(dragon_backend._hosts) + actual_hosts[0] = f"x{actual_hosts[0]}" + actual_hosts[1] = f"x{actual_hosts[1]}" + + host_list = ",".join(actual_hosts) + + run_req = DragonRunRequest( + exe="sleep", + exe_args=["5"], + path="/a/fake/path", + nodes=2, # <----- requesting 2 of 3 available nodes + hostlist=host_list, # <--- only one valid name available + tasks=1, + tasks_per_node=1, + env={}, + current_env={}, + pmi_enabled=False, + policy=DragonRunPolicy(), + ) + + can_honor, error_msg = dragon_backend._can_honor(run_req) + + # confirm the failure is indicated + assert not can_honor + # confirm failure message indicates number of nodes requested as cause + assert "named hosts" in error_msg + + +def test_can_honor_hosts_unavailable_hosts_ok(monkeypatch: pytest.MonkeyPatch) -> None: + """Verify that requesting nodes with invalid names causes number of available + nodes check to be reduced but still passes if enough valid named nodes are passed""" + dragon_backend = get_mock_backend(monkeypatch, num_cpus=8, num_gpus=0) + + # let's supply 2 valid and 1 invalid hostname + actual_hosts = list(dragon_backend._hosts) + actual_hosts[0] = f"x{actual_hosts[0]}" + + host_list = ",".join(actual_hosts) + + run_req = DragonRunRequest( + exe="sleep", + exe_args=["5"], + path="/a/fake/path", + nodes=2, # <----- requesting 2 of 3 available nodes + hostlist=host_list, # <--- two valid names are available + tasks=1, + tasks_per_node=1, + env={}, + current_env={}, + pmi_enabled=False, + policy=DragonRunPolicy(), + ) + + can_honor, error_msg = dragon_backend._can_honor(run_req) - assert actual_msg == expected_message + # confirm the failure is indicated + assert can_honor, error_msg + # confirm failure message indicates number of nodes requested as cause + assert error_msg is None, error_msg diff --git a/tests/test_dragon_runsettings.py b/tests/test_dragon_runsettings.py index 34e8510e8..8c7600c74 100644 --- a/tests/test_dragon_runsettings.py +++ b/tests/test_dragon_runsettings.py @@ -96,3 +96,122 @@ def test_dragon_runsettings_gpu_affinity(): # ensure the value is not changed when we extend the list rs.run_args["gpu-affinity"] = "7,8,9" assert rs.run_args["gpu-affinity"] != ",".join(str(val) for val in exp_value) + + +def test_dragon_runsettings_hostlist_null(): + """Verify that passing a null hostlist is treated as a failure""" + rs = DragonRunSettings(exe="sleep", exe_args=["1"]) + + # baseline check that no host list exists + stored_list = rs.run_args.get("host-list", None) + assert stored_list is None + + with pytest.raises(ValueError) as ex: + rs.set_hostlist(None) + + assert "empty hostlist" in ex.value.args[0] + + +def test_dragon_runsettings_hostlist_empty(): + """Verify that passing an empty hostlist is treated as a failure""" + rs = DragonRunSettings(exe="sleep", exe_args=["1"]) + + # baseline check that no host list exists + stored_list = rs.run_args.get("host-list", None) + assert stored_list is None + + with pytest.raises(ValueError) as ex: + rs.set_hostlist([]) + + assert "empty hostlist" in ex.value.args[0] + + +@pytest.mark.parametrize("hostlist_csv", [" ", " , , , ", ",", ",,,"]) +def test_dragon_runsettings_hostlist_whitespace_handling(hostlist_csv: str): + """Verify that passing a hostlist with emptystring host names is treated as a failure""" + rs = DragonRunSettings(exe="sleep", exe_args=["1"]) + + # baseline check that no host list exists + stored_list = rs.run_args.get("host-list", None) + assert stored_list is None + + # empty string as hostname in list + with pytest.raises(ValueError) as ex: + rs.set_hostlist(hostlist_csv) + + assert "invalid names" in ex.value.args[0] + + +@pytest.mark.parametrize( + "hostlist_csv", [[" "], [" ", "", " ", " "], ["", " "], ["", "", "", ""]] +) +def test_dragon_runsettings_hostlist_whitespace_handling_list(hostlist_csv: str): + """Verify that passing a hostlist with emptystring host names contained in a list + is treated as a failure""" + rs = DragonRunSettings(exe="sleep", exe_args=["1"]) + + # baseline check that no host list exists + stored_list = rs.run_args.get("host-list", None) + assert stored_list is None + + # empty string as hostname in list + with pytest.raises(ValueError) as ex: + rs.set_hostlist(hostlist_csv) + + assert "invalid names" in ex.value.args[0] + + +def test_dragon_runsettings_hostlist_as_csv(): + """Verify that a hostlist is stored properly when passing in a CSV string""" + rs = DragonRunSettings(exe="sleep", exe_args=["1"]) + + # baseline check that no host list exists + stored_list = rs.run_args.get("host-list", None) + assert stored_list is None + + hostnames = ["host0", "host1", "host2", "host3", "host4"] + + # set the host list with ideal comma separated values + input0 = ",".join(hostnames) + + # set the host list with a string of comma separated values + # including extra whitespace + input1 = ", ".join(hostnames) + + for hosts_input in [input0, input1]: + rs.set_hostlist(hosts_input) + + stored_list = rs.run_args.get("host-list", None) + assert stored_list + + # confirm that all values from the original list are retrieved + split_stored_list = stored_list.split(",") + assert set(hostnames) == set(split_stored_list) + + +def test_dragon_runsettings_hostlist_as_csv(): + """Verify that a hostlist is stored properly when passing in a CSV string""" + rs = DragonRunSettings(exe="sleep", exe_args=["1"]) + + # baseline check that no host list exists + stored_list = rs.run_args.get("host-list", None) + assert stored_list is None + + hostnames = ["host0", "host1", "host2", "host3", "host4"] + + # set the host list with ideal comma separated values + input0 = ",".join(hostnames) + + # set the host list with a string of comma separated values + # including extra whitespace + input1 = ", ".join(hostnames) + + for hosts_input in [input0, input1]: + rs.set_hostlist(hosts_input) + + stored_list = rs.run_args.get("host-list", None) + assert stored_list + + # confirm that all values from the original list are retrieved + split_stored_list = stored_list.split(",") + assert set(hostnames) == set(split_stored_list) diff --git a/tests/test_dragon_step.py b/tests/test_dragon_step.py index 19f408e0b..f933fb7bc 100644 --- a/tests/test_dragon_step.py +++ b/tests/test_dragon_step.py @@ -73,12 +73,18 @@ def dragon_batch_step(test_dir: str) -> DragonBatchStep: cpu_affinities = [[], [0, 1, 2], [], [3, 4, 5, 6]] gpu_affinities = [[], [], [0, 1, 2], [3, 4, 5, 6]] + # specify 3 hostnames to select from but require only 2 nodes + num_nodes = 2 + hostnames = ["host1", "host2", "host3"] + # assign some unique affinities to each run setting instance for index, rs in enumerate(settings): if gpu_affinities[index]: rs.set_node_feature("gpu") rs.set_cpu_affinity(cpu_affinities[index]) rs.set_gpu_affinity(gpu_affinities[index]) + rs.set_hostlist(hostnames) + rs.set_nodes(num_nodes) steps = list( DragonStep(name_, test_dir, rs_) for name_, rs_ in zip(names, settings) @@ -374,6 +380,11 @@ def test_dragon_batch_step_write_request_file( cpu_affinities = [[], [0, 1, 2], [], [3, 4, 5, 6]] gpu_affinities = [[], [], [0, 1, 2], [3, 4, 5, 6]] + hostnames = ["host1", "host2", "host3"] + num_nodes = 2 + + # parse requests file path from the launch command + # e.g. dragon python launch_cmd = dragon_batch_step.get_launch_cmd() requests_file = get_request_path_from_batch_script(launch_cmd) @@ -392,3 +403,5 @@ def test_dragon_batch_step_write_request_file( assert run_request assert run_request.policy.cpu_affinity == cpu_affinities[index] assert run_request.policy.gpu_affinity == gpu_affinities[index] + assert run_request.nodes == num_nodes + assert run_request.hostlist == ",".join(hostnames) diff --git a/tests/test_node_prioritizer.py b/tests/test_node_prioritizer.py new file mode 100644 index 000000000..abb4624b6 --- /dev/null +++ b/tests/test_node_prioritizer.py @@ -0,0 +1,555 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import random +import threading +import typing as t + +import pytest + +from smartsim._core.launcher.dragon.pqueue import NodePrioritizer, PrioritizerFilter +from smartsim.error.errors import SmartSimError +from smartsim.log import get_logger + +# The tests in this file belong to the group_b group +pytestmark = pytest.mark.group_b + + +logger = get_logger(__name__) + + +class MockNode: + def __init__(self, hostname: str, num_cpus: int, num_gpus: int) -> None: + self.hostname = hostname + self.num_cpus = num_cpus + self.num_gpus = num_gpus + + +def mock_node_hosts( + num_cpu_nodes: int, num_gpu_nodes: int +) -> t.Tuple[t.List[MockNode], t.List[MockNode]]: + cpu_hosts = [f"cpu-node-{i}" for i in range(num_cpu_nodes)] + gpu_hosts = [f"gpu-node-{i}" for i in range(num_gpu_nodes)] + + return cpu_hosts, gpu_hosts + + +def mock_node_builder(num_cpu_nodes: int, num_gpu_nodes: int) -> t.List[MockNode]: + nodes = [] + cpu_hosts, gpu_hosts = mock_node_hosts(num_cpu_nodes, num_gpu_nodes) + + nodes.extend(MockNode(hostname, 4, 0) for hostname in cpu_hosts) + nodes.extend(MockNode(hostname, 4, 4) for hostname in gpu_hosts) + + return nodes + + +def test_node_prioritizer_init_null() -> None: + """Verify that the priorizer reports failures to send a valid node set + if a null value is passed""" + lock = threading.RLock() + with pytest.raises(SmartSimError) as ex: + NodePrioritizer(None, lock) + + assert "Missing" in ex.value.args[0] + + +def test_node_prioritizer_init_empty() -> None: + """Verify that the priorizer reports failures to send a valid node set + if an empty list is passed""" + lock = threading.RLock() + with pytest.raises(SmartSimError) as ex: + NodePrioritizer([], lock) + + assert "Missing" in ex.value.args[0] + + +@pytest.mark.parametrize( + "num_cpu_nodes,num_gpu_nodes", [(1, 1), (2, 1), (1, 2), (8, 4), (1000, 200)] +) +def test_node_prioritizer_init_ok(num_cpu_nodes: int, num_gpu_nodes: int) -> None: + """Verify that initialization with a valid node list results in the + appropriate cpu & gpu ref counts, and complete ref map""" + nodes = mock_node_builder(num_cpu_nodes, num_gpu_nodes) + + # perform prioritizer initialization + lock = threading.RLock() + p = NodePrioritizer(nodes, lock) + + # get a copy of all the expected host names + cpu_hosts, gpu_hosts = mock_node_hosts(num_cpu_nodes, num_gpu_nodes) + all_hosts = cpu_hosts + gpu_hosts + assert len(all_hosts) == num_cpu_nodes + num_gpu_nodes + + # verify tracking data is initialized correctly for all nodes + for hostname in all_hosts: + # show that the ref map is tracking the node + assert hostname in p._nodes + + tracking_info = p.get_tracking_info(hostname) + + # show that the node is created w/zero ref counts + assert tracking_info.num_refs == 0 + + # show that the node is created and marked as not dirty (unchanged) + # assert tracking_info.is_dirty == False + + # iterate through known cpu node keys and verify prioritizer initialization + for hostname in cpu_hosts: + # show that the device ref counters are appropriately assigned + cpu_ref = next((n for n in p._cpu_refs if n.hostname == hostname), None) + assert cpu_ref, "CPU-only node not found in cpu ref set" + + gpu_ref = next((n for n in p._gpu_refs if n.hostname == hostname), None) + assert not gpu_ref, "CPU-only node should not be found in gpu ref set" + + # iterate through known GPU node keys and verify prioritizer initialization + for hostname in gpu_hosts: + # show that the device ref counters are appropriately assigned + gpu_ref = next((n for n in p._gpu_refs if n.hostname == hostname), None) + assert gpu_ref, "GPU-only node not found in gpu ref set" + + cpu_ref = next((n for n in p._cpu_refs if n.hostname == hostname), None) + assert not cpu_ref, "GPU-only node should not be found in cpu ref set" + + # verify we have all hosts in the ref map + assert set(p._nodes.keys()) == set(all_hosts) + + # verify we have no extra hosts in ref map + assert len(p._nodes.keys()) == len(set(all_hosts)) + + +def test_node_prioritizer_direct_increment() -> None: + """Verify that performing the increment operation causes the expected + side effect on the intended records""" + + num_cpu_nodes, num_gpu_nodes = 32, 8 + cpu_hosts, gpu_hosts = mock_node_hosts(num_cpu_nodes, num_gpu_nodes) + nodes = mock_node_builder(num_cpu_nodes, num_gpu_nodes) + + exclude_index = 2 + exclude_host0 = cpu_hosts[exclude_index] + exclude_host1 = gpu_hosts[exclude_index] + exclusions = [exclude_host0, exclude_host1] + + lock = threading.RLock() + p = NodePrioritizer(nodes, lock) + + # let's increment each element in a predictable way and verify + for node in nodes: + if node.hostname in exclusions: + # expect 1 cpu and 1 gpu node at zero and not incremented + continue + + if node.num_gpus == 0: + num_increments = random.randint(0, num_cpu_nodes - 1) + else: + num_increments = random.randint(0, num_gpu_nodes - 1) + + # increment this node some random number of times + for _ in range(num_increments): + p.increment(node.hostname) + + # ... and verify the correct incrementing is applied + tracking_info = p.get_tracking_info(node.hostname) + assert tracking_info.num_refs == num_increments + + # verify the excluded cpu node was never changed + tracking_info0 = p.get_tracking_info(exclude_host0) + assert tracking_info0.num_refs == 0 + + # verify the excluded gpu node was never changed + tracking_info1 = p.get_tracking_info(exclude_host1) + assert tracking_info1.num_refs == 0 + + +def test_node_prioritizer_indirect_increment() -> None: + """Verify that performing the increment operation indirectly affects + each available node until we run out of nodes to return""" + + num_cpu_nodes, num_gpu_nodes = 8, 0 + cpu_hosts, gpu_hosts = mock_node_hosts(num_cpu_nodes, num_gpu_nodes) + nodes = mock_node_builder(num_cpu_nodes, num_gpu_nodes) + + lock = threading.RLock() + p = NodePrioritizer(nodes, lock) + + # verify starting state + for node in p._nodes.values(): + tracking_info = p.get_tracking_info(node.hostname) + + assert node.num_refs == 0 # <--- ref count starts at zero + assert tracking_info.num_refs == 0 # <--- ref count starts at zero + + # perform indirect + for node in p._nodes.values(): + tracking_info = p.get_tracking_info(node.hostname) + + # apply `next` operation and verify tracking info reflects new ref + node = p.next(PrioritizerFilter.CPU) + tracking_info = p.get_tracking_info(node.hostname) + + # verify side-effects + assert tracking_info.num_refs > 0 # <--- ref count should now be > 0 + + # we expect it to give back only "clean" nodes from next* + assert tracking_info.is_dirty == False # NOTE: this is "hidden" by protocol + + # every node should be incremented now. prioritizer shouldn't have anything to give + tracking_info = p.next(PrioritizerFilter.CPU) + assert tracking_info is None # <--- get_next shouldn't have any nodes to give + + +def test_node_prioritizer_indirect_decrement_availability() -> None: + """Verify that a node who is decremented (dirty) is made assignable + on a subsequent request""" + + num_cpu_nodes, num_gpu_nodes = 1, 0 + cpu_hosts, gpu_hosts = mock_node_hosts(num_cpu_nodes, num_gpu_nodes) + nodes = mock_node_builder(num_cpu_nodes, num_gpu_nodes) + + lock = threading.RLock() + p = NodePrioritizer(nodes, lock) + + # increment our only node... + p.increment(cpu_hosts[0]) + + tracking_info = p.next() + assert tracking_info is None, "No nodes should be assignable" + + # perform a decrement... + p.decrement(cpu_hosts[0]) + + # ... and confirm that the node is available again + tracking_info = p.next() + assert tracking_info is not None, "A node should be assignable" + + +def test_node_prioritizer_multi_increment() -> None: + """Verify that retrieving multiple nodes via `next_n` API correctly + increments reference counts and returns appropriate results""" + + num_cpu_nodes, num_gpu_nodes = 8, 0 + cpu_hosts, gpu_hosts = mock_node_hosts(num_cpu_nodes, num_gpu_nodes) + nodes = mock_node_builder(num_cpu_nodes, num_gpu_nodes) + + lock = threading.RLock() + p = NodePrioritizer(nodes, lock) + + # Mark some nodes as dirty to verify retrieval + p.increment(cpu_hosts[0]) + assert p.get_tracking_info(cpu_hosts[0]).num_refs > 0 + + p.increment(cpu_hosts[2]) + assert p.get_tracking_info(cpu_hosts[2]).num_refs > 0 + + p.increment(cpu_hosts[4]) + assert p.get_tracking_info(cpu_hosts[4]).num_refs > 0 + + # use next_n w/the minimum allowed value + all_tracking_info = p.next_n(1, PrioritizerFilter.CPU) # <---- next_n(1) + + # confirm the number requested is honored + assert len(all_tracking_info) == 1 + # ensure no unavailable node is returned + assert all_tracking_info[0].hostname not in [ + cpu_hosts[0], + cpu_hosts[2], + cpu_hosts[4], + ] + + # use next_n w/value that exceeds available number of open nodes + # 3 direct increments in setup, 1 out of next_n(1), 4 left + all_tracking_info = p.next_n(5, PrioritizerFilter.CPU) + + # confirm that no nodes are returned, even though 4 out of 5 requested are available + assert len(all_tracking_info) == 0 + + +def test_node_prioritizer_multi_increment_validate_n() -> None: + """Verify that retrieving multiple nodes via `next_n` API correctly + reports failures when the request size is above pool size""" + + num_cpu_nodes, num_gpu_nodes = 8, 0 + cpu_hosts, gpu_hosts = mock_node_hosts(num_cpu_nodes, num_gpu_nodes) + nodes = mock_node_builder(num_cpu_nodes, num_gpu_nodes) + + lock = threading.RLock() + p = NodePrioritizer(nodes, lock) + + # we have 8 total cpu nodes available... request too many nodes + all_tracking_info = p.next_n(9, PrioritizerFilter.CPU) + assert len(all_tracking_info) == 0 + + all_tracking_info = p.next_n(num_cpu_nodes * 1000, PrioritizerFilter.CPU) + assert len(all_tracking_info) == 0 + + +def test_node_prioritizer_indirect_direct_interleaved_increments() -> None: + """Verify that interleaving indirect and direct increments results in + expected ref counts""" + + num_cpu_nodes, num_gpu_nodes = 8, 4 + cpu_hosts, gpu_hosts = mock_node_hosts(num_cpu_nodes, num_gpu_nodes) + nodes = mock_node_builder(num_cpu_nodes, num_gpu_nodes) + + lock = threading.RLock() + p = NodePrioritizer(nodes, lock) + + # perform some set of non-popped increments + p.increment(gpu_hosts[1]) + p.increment(gpu_hosts[3]) + p.increment(gpu_hosts[3]) + + # increment 0th item 1x + p.increment(cpu_hosts[0]) + + # increment 3th item 2x + p.increment(cpu_hosts[3]) + p.increment(cpu_hosts[3]) + + # increment last item 3x + p.increment(cpu_hosts[7]) + p.increment(cpu_hosts[7]) + p.increment(cpu_hosts[7]) + + tracking_info = p.get_tracking_info(gpu_hosts[1]) + assert tracking_info.num_refs == 1 + + tracking_info = p.get_tracking_info(gpu_hosts[3]) + assert tracking_info.num_refs == 2 + + nodes = [n for n in p._nodes.values() if n.num_refs == 0 and n.num_gpus == 0] + + # we should skip the 0-th item in the heap due to direct increment + tracking_info = p.next(PrioritizerFilter.CPU) + assert tracking_info.num_refs == 1 + # confirm we get a cpu node + assert "cpu-node" in tracking_info.hostname + + # this should pull the next item right out + tracking_info = p.next(PrioritizerFilter.CPU) + assert tracking_info.num_refs == 1 + assert "cpu-node" in tracking_info.hostname + + # ensure we pull from gpu nodes and the 0th item is returned + tracking_info = p.next(PrioritizerFilter.GPU) + assert tracking_info.num_refs == 1 + assert "gpu-node" in tracking_info.hostname + + # we should step over the 3-th node on this iteration + tracking_info = p.next(PrioritizerFilter.CPU) + assert tracking_info.num_refs == 1 + assert "cpu-node" in tracking_info.hostname + + # and ensure that heap also steps over a direct increment + tracking_info = p.next(PrioritizerFilter.GPU) + assert tracking_info.num_refs == 1 + assert "gpu-node" in tracking_info.hostname + + # and another GPU request should return nothing + tracking_info = p.next(PrioritizerFilter.GPU) + assert tracking_info is None + + +def test_node_prioritizer_decrement_floor() -> None: + """Verify that repeatedly decrementing ref counts does not + allow negative ref counts""" + + num_cpu_nodes, num_gpu_nodes = 8, 4 + cpu_hosts, gpu_hosts = mock_node_hosts(num_cpu_nodes, num_gpu_nodes) + nodes = mock_node_builder(num_cpu_nodes, num_gpu_nodes) + + lock = threading.RLock() + p = NodePrioritizer(nodes, lock) + + # try a ton of decrements on all the items in the prioritizer + for _ in range(len(nodes) * 100): + index = random.randint(0, num_cpu_nodes - 1) + p.decrement(cpu_hosts[index]) + + index = random.randint(0, num_gpu_nodes - 1) + p.decrement(gpu_hosts[index]) + + for node in nodes: + tracking_info = p.get_tracking_info(node.hostname) + assert tracking_info.num_refs == 0 + + +@pytest.mark.parametrize("num_requested", [1, 2, 3]) +def test_node_prioritizer_multi_increment_subheap(num_requested: int) -> None: + """Verify that retrieving multiple nodes via `next_n` API correctly + increments reference counts and returns appropriate results + when requesting an in-bounds number of nodes""" + + num_cpu_nodes, num_gpu_nodes = 8, 0 + cpu_hosts, gpu_hosts = mock_node_hosts(num_cpu_nodes, num_gpu_nodes) + nodes = mock_node_builder(num_cpu_nodes, num_gpu_nodes) + + lock = threading.RLock() + p = NodePrioritizer(nodes, lock) + + # Mark some nodes as dirty to verify retrieval + p.increment(cpu_hosts[0]) + p.increment(cpu_hosts[2]) + p.increment(cpu_hosts[4]) + + hostnames = [cpu_hosts[0], cpu_hosts[1], cpu_hosts[2], cpu_hosts[3], cpu_hosts[5]] + + # request n == {num_requested} nodes from set of 3 available + all_tracking_info = p.next_n( + num_requested, + hosts=hostnames, + ) # <---- w/0,2,4 assigned, only 1,3,5 from hostnames can work + + # all parameterizations should result in a matching output size + assert len(all_tracking_info) == num_requested + + +def test_node_prioritizer_multi_increment_subheap_assigned() -> None: + """Verify that retrieving multiple nodes via `next_n` API does + not return anything when the number requested cannot be satisfied + by the given subheap due to prior assignment""" + + num_cpu_nodes, num_gpu_nodes = 8, 0 + cpu_hosts, gpu_hosts = mock_node_hosts(num_cpu_nodes, num_gpu_nodes) + nodes = mock_node_builder(num_cpu_nodes, num_gpu_nodes) + + lock = threading.RLock() + p = NodePrioritizer(nodes, lock) + + # Mark some nodes as dirty to verify retrieval + p.increment(cpu_hosts[0]) + p.increment(cpu_hosts[2]) + + hostnames = [ + cpu_hosts[0], + "x" + cpu_hosts[2], + ] # <--- we can't get 2 from 1 valid node name + + # request n == {num_requested} nodes from set of 3 available + num_requested = 2 + all_tracking_info = p.next_n(num_requested, hosts=hostnames) + + # w/0,2 assigned, nothing can be returned + assert len(all_tracking_info) == 0 + + +def test_node_prioritizer_empty_subheap_next_w_hosts() -> None: + """Verify that retrieving multiple nodes via `next_n` API does + not allow an empty host list""" + + num_cpu_nodes, num_gpu_nodes = 8, 0 + cpu_hosts, gpu_hosts = mock_node_hosts(num_cpu_nodes, num_gpu_nodes) + nodes = mock_node_builder(num_cpu_nodes, num_gpu_nodes) + + lock = threading.RLock() + p = NodePrioritizer(nodes, lock) + + # Mark some nodes as dirty to verify retrieval + p.increment(cpu_hosts[0]) + p.increment(cpu_hosts[2]) + + hostnames = [] + + # request n == {num_requested} nodes from set of 3 available + num_requested = 1 + with pytest.raises(ValueError) as ex: + p.next(hosts=hostnames) + + assert "No hostnames provided" == ex.value.args[0] + + +def test_node_prioritizer_empty_subheap_next_n_w_hosts() -> None: + """Verify that retrieving multiple nodes via `next_n` API does + not allow an empty host list""" + + num_cpu_nodes, num_gpu_nodes = 8, 0 + cpu_hosts, gpu_hosts = mock_node_hosts(num_cpu_nodes, num_gpu_nodes) + nodes = mock_node_builder(num_cpu_nodes, num_gpu_nodes) + + lock = threading.RLock() + p = NodePrioritizer(nodes, lock) + + # Mark some nodes as dirty to verify retrieval + p.increment(cpu_hosts[0]) + p.increment(cpu_hosts[2]) + + hostnames = [] + + # request n == {num_requested} nodes from set of 3 available + num_requested = 1 + with pytest.raises(ValueError) as ex: + p.next_n(num_requested, hosts=hostnames) + + assert "No hostnames provided" == ex.value.args[0] + + +@pytest.mark.parametrize("num_requested", [-100, -1, 0]) +def test_node_prioritizer_empty_subheap_next_n(num_requested: int) -> None: + """Verify that retrieving a node via `next_n` API does + not allow a request with num_items < 1""" + + num_cpu_nodes, num_gpu_nodes = 8, 0 + cpu_hosts, gpu_hosts = mock_node_hosts(num_cpu_nodes, num_gpu_nodes) + nodes = mock_node_builder(num_cpu_nodes, num_gpu_nodes) + + lock = threading.RLock() + p = NodePrioritizer(nodes, lock) + + # Mark some nodes as dirty to verify retrieval + p.increment(cpu_hosts[0]) + p.increment(cpu_hosts[2]) + + # request n == {num_requested} nodes from set of 3 available + with pytest.raises(ValueError) as ex: + p.next_n(num_requested) + + assert "Number of items requested" in ex.value.args[0] + + +@pytest.mark.parametrize("num_requested", [-100, -1, 0]) +def test_node_prioritizer_empty_subheap_next_n(num_requested: int) -> None: + """Verify that retrieving multiple nodes via `next_n` API does + not allow a request with num_items < 1""" + + num_cpu_nodes, num_gpu_nodes = 8, 0 + cpu_hosts, gpu_hosts = mock_node_hosts(num_cpu_nodes, num_gpu_nodes) + nodes = mock_node_builder(num_cpu_nodes, num_gpu_nodes) + + lock = threading.RLock() + p = NodePrioritizer(nodes, lock) + + # Mark some nodes as dirty to verify retrieval + p.increment(cpu_hosts[0]) + p.increment(cpu_hosts[2]) + + hostnames = [cpu_hosts[0], cpu_hosts[2]] + + # request n == {num_requested} nodes from set of 3 available + with pytest.raises(ValueError) as ex: + p.next_n(num_requested, hosts=hostnames) + + assert "Number of items requested" in ex.value.args[0]