diff --git a/smartsim/_core/launcher/dragon/dragonBackend.py b/smartsim/_core/launcher/dragon/dragonBackend.py index 0fa356f7d1..2b2a18c7cc 100644 --- a/smartsim/_core/launcher/dragon/dragonBackend.py +++ b/smartsim/_core/launcher/dragon/dragonBackend.py @@ -25,7 +25,6 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import collections import functools -import heapq import itertools import time import typing as t @@ -47,7 +46,6 @@ import dragon.native.machine as dragon_machine from smartsim._core.launcher.dragon.pqueue import NodePrioritizer -from smartsim.error.errors import SmartSimError # pylint: enable=import-error # isort: on @@ -74,7 +72,7 @@ # tracking structure for [num_refs, node_name, is_dirty] -_NodeRefCount = t.List[int, str, bool] +_NodeRefCount = t.List[t.Union[int, str, bool]] class DragonStatus(str, Enum): @@ -206,14 +204,13 @@ def hosts(self) -> list[str]: return self._hosts @property - def allocated_hosts(self) -> dict[str, str]: - with self._queue_lock: - return self._allocated_hosts + def assigned_steps(self) -> dict[str, str]: + return self._assigned_steps - # @property - # def free_hosts(self) -> t.Deque[str]: - # with self._queue_lock: - # return self._free_hosts + @property + def free_hosts(self) -> t.List[str]: + with self._queue_lock: + return [str(item[1]) for item in self._prioritizer.unassigned()] @property def group_infos(self) -> dict[str, ProcessGroupInfo]: @@ -232,7 +229,7 @@ def _initialize_hosts(self) -> None: # """List of hosts available in allocation""" # self._free_hosts: t.Deque[str] = collections.deque(self._hosts) """List of hosts on which steps can be launched""" - self._allocated_hosts: t.Dict[str, str] = {} + self._assigned_steps: t.Dict[str, str] = {} """Mapping of hosts on which a step is already running to step ID""" self._ref_map: t.Dict[str, _NodeRefCount] = {} @@ -242,8 +239,6 @@ def _initialize_hosts(self) -> None: self._gpu_refs: t.List[_NodeRefCount] = [] """Track reference counts to GPU nodes""" - self._initialize_reference_counters() - def __str__(self) -> str: return self.status_message @@ -322,9 +317,11 @@ def _can_honor_hosts( message += f"but only {len(self._hosts)} nodes are available." return False, message - requested_hosts: t.Set[str] = set(request.policy.hostlist) - if not requested_hosts: - return True, None + requested_hosts: t.Set[str] = set() + if request.hostlist: + requested_hosts = set(request.hostlist) + if not requested_hosts: + return True, None all_hosts = set(self._hosts) valid_hosts = all_hosts.intersection(requested_hosts) @@ -414,18 +411,18 @@ def _allocate_step( if num_hosts <= 0 or num_hosts > len(self._hosts): return None - if request.policy.hostlist: + if request.hostlist: tracking_info = self._prioritizer.next_n_from( - num_hosts, request.policy.hostlist + num_hosts, [host for host in request.hostlist.split(",") if host] ) else: tracking_info = self._prioritizer.next_n(num_hosts) - + # give back the list of node names - to_allocate = [info[1] for info in tracking_info] - + to_allocate = [str(info[1]) for info in tracking_info] + for host in to_allocate: - self._allocated_hosts[host] = step_id + self._assigned_steps[host] = step_id return to_allocate @@ -715,7 +712,7 @@ def _refresh_statuses(self) -> None: for host in group_info.hosts: logger.debug(f"Releasing host {host}") try: - self._allocated_hosts.pop(host) + self._assigned_steps.pop(host) self._prioritizer.decrement(host) except KeyError: logger.error(f"Tried to free a non-allocated host: {host}") diff --git a/smartsim/_core/launcher/dragon/pqueue.py b/smartsim/_core/launcher/dragon/pqueue.py index b24a01ed3a..41e39e2ed7 100644 --- a/smartsim/_core/launcher/dragon/pqueue.py +++ b/smartsim/_core/launcher/dragon/pqueue.py @@ -30,14 +30,13 @@ from smartsim.error.errors import SmartSimError - # tracking structure for [num_refs, node_name, is_dirty] -_NodeRefCount = t.List[t.Union[int, str, bool]] +_NodeRefCount = t.List[t.Union[int, str]] class PrioritizerFilter(str, enum.Enum): - CPU: str = enum.auto() - GPU: str = enum.auto() + CPU = enum.auto() + GPU = enum.auto() class Node(t.Protocol): @@ -54,7 +53,6 @@ def hostname(self) -> str: ... class NodePrioritizer: - def __init__(self, nodes: t.List[Node], lock: threading.RLock) -> None: if not nodes: raise SmartSimError("Missing nodes to prioritize") @@ -76,7 +74,11 @@ def _initialize_reference_counters(self, nodes: t.List[Node]) -> None: """Perform initialization of reference counters for nodes in the allocation.""" for node in nodes: # initialize all node counts to 0 and mark the entries "is_dirty=False" - tracking_info = [0, node.hostname, False] # use list for mutability + tracking_info: _NodeRefCount = [ + 0, + node.hostname, + 0, + ] # use list for mutability self._ref_map[node.hostname] = tracking_info @@ -94,8 +96,9 @@ def increment(self, host: str) -> None: ref counter is marked as dirty to trigger a reordering on retrieval""" with self._lock: tracking_info = self._ref_map[host] - tracking_info[0] += 1 - tracking_info[2] = True + ref_count, *_ = tracking_info + tracking_info[0] = int(ref_count) + 1 + tracking_info[2] = 1 def get_tracking_info(self, host: str) -> _NodeRefCount: return self._ref_map[host] @@ -105,8 +108,8 @@ def decrement(self, host: str) -> None: ref counter is marked as dirty to trigger a reordering""" with self._lock: tracking_info = self._ref_map[host] - tracking_info[0] = max(tracking_info[0] - 1, 0) - tracking_info[2] = True + tracking_info[0] = max(int(tracking_info[0]) - 1, 0) + tracking_info[2] = 1 def _create_sub_heap(self, hosts: t.List[str]) -> t.List[_NodeRefCount]: nodes_tracking_info: t.List[_NodeRefCount] = [] @@ -121,17 +124,17 @@ def _create_sub_heap(self, hosts: t.List[str]) -> t.List[_NodeRefCount]: return nodes_tracking_info - def next_from(self, hosts: t.List[str]) -> _NodeRefCount: + def next_from(self, hosts: t.List[str]) -> t.Optional[_NodeRefCount]: """Return the next node available given a set of desired hosts""" sub_heap = self._create_sub_heap(hosts) return self._get_next_available_node(sub_heap) - def next_n_from(self, n: int, hosts: t.List[str]) -> t.List[_NodeRefCount]: + def next_n_from(self, num_items: int, hosts: t.List[str]) -> t.List[_NodeRefCount]: """Return the next N available nodes given a set of desired hosts""" sub_heap = self._create_sub_heap(hosts) - return self._get_next_n_available_nodes(n, sub_heap) + return self._get_next_n_available_nodes(num_items, sub_heap) - def open( + def unassigned( self, heap: t.Optional[t.List[_NodeRefCount]] = None ) -> t.List[_NodeRefCount]: """Helper method to identify the nodes that are currently not assigned""" @@ -140,7 +143,7 @@ def open( return [node for node in heap if node[0] == 0] - def closed( + def assigned( self, heap: t.Optional[t.List[_NodeRefCount]] = None ) -> t.List[_NodeRefCount]: """Helper method to identify the nodes that are currently assigned""" @@ -150,25 +153,28 @@ def closed( return [node for node in heap if node[0] == 1] def _check_satisfiable_n( - self, n: int, heap: t.Optional[t.List[_NodeRefCount]] = None + self, num_items: int, heap: t.Optional[t.List[_NodeRefCount]] = None ) -> bool: """Validates that a request for some number of nodes `n` can be satisfied by the prioritizer given the set of nodes available""" num_nodes = len(self._ref_map.keys()) - if n < 1: + if num_items < 1: # msg = f"Unable to satisfy request for less than 1 node" # raise ValueError(msg) return False - if num_nodes < n: + if num_nodes < num_items: # msg = f"Unable to satisfy request for {n} nodes from pool of {num_nodes}" # raise ValueError(msg) return False - num_open = len(self.open()) if heap is None else len(self.open(heap)) - if num_open < n: - # msg = f"Unable to satisfy request for {n} nodes from {num_nodes} available" + num_open = ( + len(self.unassigned()) if heap is None else len(self.unassigned(heap)) + ) + if num_open < num_items: + # msg = "Unable to satisfy request for " + # f"{n} nodes from {num_nodes} available" # raise ValueError(msg) return False @@ -189,13 +195,13 @@ def _get_next_available_node( while is_dirty: if is_dirty: # mark dirty items clean and place back into heap to be sorted - tracking_info[2] = False + tracking_info[2] = 0 heapq.heappush(heap, tracking_info) tracking_info = heapq.heappop(heap) is_dirty = tracking_info[2] - original_ref_count = tracking_info[0] + original_ref_count = int(tracking_info[0]) if original_ref_count == 0: # increment the ref count before putting back onto heap tracking_info[0] = original_ref_count + 1 @@ -210,8 +216,8 @@ def _get_next_available_node( def _get_next_n_available_nodes( self, - n: int, - heap: t.Optional[t.List[_NodeRefCount]], + num_items: int, + heap: t.List[_NodeRefCount], ) -> t.List[_NodeRefCount]: """Find the next N available nodes w/least amount of references using the supplied filter to target a specific node capability @@ -219,37 +225,42 @@ def _get_next_n_available_nodes( :returns: Collection of reserved nodes""" next_nodes: t.List[_NodeRefCount] = [] - if not self._check_satisfiable_n(n, heap): + if not self._check_satisfiable_n(num_items, heap): return next_nodes # TODO: convert return type to an actual node or the hostname next_node: t.Optional[_NodeRefCount] = self._get_next_available_node(heap) - while len(next_nodes) < n and next_node is not None: + while len(next_nodes) < num_items and next_node is not None: next_nodes.append(next_node) next_node = self._get_next_available_node(heap) return next_nodes - def next(self, filter: PrioritizerFilter = PrioritizerFilter.CPU) -> _NodeRefCount: + def next( + self, filter_on: PrioritizerFilter = PrioritizerFilter.CPU + ) -> t.Optional[_NodeRefCount]: """Find the next node available w/least amount of references using the supplied filter to target a specific node capability""" heap = self._cpu_refs - if filter == PrioritizerFilter.GPU: + if filter_on == PrioritizerFilter.GPU: heap = self._gpu_refs # TODO: convert return type to an actual node or the hostname - return self._get_next_available_node(heap) + if node := self._get_next_available_node(heap): + return node + + return None def next_n( - self, n: int = 1, filter: PrioritizerFilter = PrioritizerFilter.CPU + self, num_items: int = 1, filter_on: PrioritizerFilter = PrioritizerFilter.CPU ) -> t.List[_NodeRefCount]: """Find the next N available nodes w/least amount of references using the supplied filter to target a specific node capability :param n: number of nodes to reserve :returns: Collection of reserved nodes""" heap = self._cpu_refs - if filter == PrioritizerFilter.GPU: + if filter_on == PrioritizerFilter.GPU: heap = self._gpu_refs - return self._get_next_n_available_nodes(n, heap) + return self._get_next_n_available_nodes(num_items, heap) diff --git a/smartsim/_core/launcher/step/dragonStep.py b/smartsim/_core/launcher/step/dragonStep.py index 3a3bb5d8f0..581256c574 100644 --- a/smartsim/_core/launcher/step/dragonStep.py +++ b/smartsim/_core/launcher/step/dragonStep.py @@ -167,9 +167,11 @@ def _write_request_file(self) -> str: run_settings = t.cast(DragonRunSettings, step.step_settings) run_args = run_settings.run_args env = run_settings.env_vars - nodes = int(run_args.get("nodes", None) or 1) + nodes = int( + run_args.get("nodes", None) or 1 + ) # todo: is this the same use as my new host-list? tasks_per_node = int(run_args.get("tasks-per-node", None) or 1) - hosts = str(run_args.get("host-list", "")) + # hosts = str(run_args.get("host-list", "")) policy = DragonRunPolicy.from_run_args(run_args) diff --git a/smartsim/_core/schemas/dragonRequests.py b/smartsim/_core/schemas/dragonRequests.py index 894946829a..960f23e24f 100644 --- a/smartsim/_core/schemas/dragonRequests.py +++ b/smartsim/_core/schemas/dragonRequests.py @@ -72,7 +72,7 @@ def from_run_args( # list[str] converted to comma-separated str must split into a list[str] hosts: t.List[str] = [] - if hostlist_value := run_args.get("host-list", None): + if hostlist_value := str(run_args.get("host-list", "")): hosts = [x for x in hostlist_value.split(",") if x] try: diff --git a/tests/test_dragon_run_request.py b/tests/test_dragon_run_request.py index 94c17c222a..1a87213ddd 100644 --- a/tests/test_dragon_run_request.py +++ b/tests/test_dragon_run_request.py @@ -270,8 +270,8 @@ def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None: assert dragon_backend._running_steps == [step_id] assert len(dragon_backend._queued_steps) == 0 assert len(dragon_backend._free_hosts) == 1 - assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id - assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id + assert dragon_backend._assigned_steps[dragon_backend.hosts[0]] == step_id + assert dragon_backend._assigned_steps[dragon_backend.hosts[1]] == step_id monkeypatch.setattr( dragon_backend._group_infos[step_id].process_group, "status", "Running" @@ -282,8 +282,8 @@ def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None: assert dragon_backend._running_steps == [step_id] assert len(dragon_backend._queued_steps) == 0 assert len(dragon_backend._free_hosts) == 1 - assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id - assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id + assert dragon_backend._assigned_steps[dragon_backend.hosts[0]] == step_id + assert dragon_backend._assigned_steps[dragon_backend.hosts[1]] == step_id dragon_backend._group_infos[step_id].status = SmartSimStatus.STATUS_CANCELLED @@ -367,8 +367,8 @@ def test_run_request_with_policy(monkeypatch: pytest.MonkeyPatch) -> None: assert dragon_backend._running_steps == [step_id] assert len(dragon_backend._queued_steps) == 0 assert len(dragon_backend._free_hosts) == 1 - assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id - assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id + assert dragon_backend._assigned_steps[dragon_backend.hosts[0]] == step_id + assert dragon_backend._assigned_steps[dragon_backend.hosts[1]] == step_id monkeypatch.setattr( dragon_backend._group_infos[step_id].process_group, "status", "Running" @@ -379,8 +379,8 @@ def test_run_request_with_policy(monkeypatch: pytest.MonkeyPatch) -> None: assert dragon_backend._running_steps == [step_id] assert len(dragon_backend._queued_steps) == 0 assert len(dragon_backend._free_hosts) == 1 - assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id - assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id + assert dragon_backend._assigned_steps[dragon_backend.hosts[0]] == step_id + assert dragon_backend._assigned_steps[dragon_backend.hosts[1]] == step_id dragon_backend._group_infos[step_id].status = SmartSimStatus.STATUS_CANCELLED @@ -433,7 +433,7 @@ def test_stop_request(monkeypatch: pytest.MonkeyPatch) -> None: == SmartSimStatus.STATUS_CANCELLED ) - assert len(dragon_backend._allocated_hosts) == 0 + assert len(dragon_backend._assigned_steps) == 0 assert len(dragon_backend._free_hosts) == 3 diff --git a/tests/test_node_prioritizer.py b/tests/test_node_prioritizer.py index 092e20f9cb..239dc679b5 100644 --- a/tests/test_node_prioritizer.py +++ b/tests/test_node_prioritizer.py @@ -23,11 +23,12 @@ # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import pytest import random import threading import typing as t +import pytest + from smartsim._core.launcher.dragon.pqueue import NodePrioritizer, PrioritizerFilter from smartsim.error.errors import SmartSimError from smartsim.log import get_logger