From edb994f8443222da547e0dbeed72c8a791381543 Mon Sep 17 00:00:00 2001 From: ankona <3595025+ankona@users.noreply.github.com> Date: Wed, 24 Jul 2024 10:27:35 -0500 Subject: [PATCH] checkpoint --- .../_core/launcher/dragon/dragonBackend.py | 99 +------------------ 1 file changed, 2 insertions(+), 97 deletions(-) diff --git a/smartsim/_core/launcher/dragon/dragonBackend.py b/smartsim/_core/launcher/dragon/dragonBackend.py index 910b6d6228..baa542eda2 100644 --- a/smartsim/_core/launcher/dragon/dragonBackend.py +++ b/smartsim/_core/launcher/dragon/dragonBackend.py @@ -46,6 +46,7 @@ import dragon.native.process_group as dragon_process_group import dragon.native.machine as dragon_machine +from smartsim._core.launcher.dragon.pqueue import NodePrioritizer from smartsim.error.errors import SmartSimError # pylint: enable=import-error @@ -197,6 +198,7 @@ def __init__(self, pid: int) -> None: self._view = DragonBackendView(self) logger.debug(self._view.host_desc) self._infra_ddict: t.Optional[dragon_ddict.DDict] = None + self._prioritizer = NodePrioritizer(self._nodes, self._queue_lock) @property def hosts(self) -> list[str]: @@ -218,103 +220,6 @@ def group_infos(self) -> dict[str, ProcessGroupInfo]: with self._queue_lock: return self._group_infos - def _initialize_reference_counters(self) -> None: - """Perform initialization of reference counters for nodes in the allocation.""" - for node in self._nodes: - # initialize all node counts to 0 and mark the entries "is_dirty=False" - tracking_info = [0, node.hostname, False] # use list for mutability - - self._ref_map[node.hostname] = tracking_info - - if node.num_gpus: - heapq.heappush(self._gpu_refs, tracking_info) - else: - heapq.heappush(self._cpu_refs, tracking_info) - - def _direct_increment_host(self, host: str) -> None: - """Directly increment the reference count of a given node and ensure the - ref counter is marked as dirty to trigger a reordering""" - with self._queue_lock: - tracking_info = self._ref_map[host] - tracking_info[0] += 1 - tracking_info[2] = True - - def _direct_decrement_host(self, host: str) -> None: - """Directly increment the reference count of a given node and ensure the - ref counter is marked as dirty to trigger a reordering""" - with self._queue_lock: - tracking_info = self._ref_map[host] - tracking_info[0] -= 1 - tracking_info[2] = True - - def _create_sub_heap(self, hosts: t.List[str]) -> t.List[_NodeRefCount]: - nodes_tracking_info: t.List[_NodeRefCount] = [] - - # Collect all the tracking info for the requested nodes... - for host in hosts: - tracking_info = self._ref_map[host] - nodes_tracking_info.append(tracking_info) - - # ... and use it to create a new heap from a specified subset of nodes - heapq.heapify(nodes_tracking_info) - - return nodes_tracking_info - - def _get_next_available_cpu_node_from_subset( - self, hosts: t.List[str] - ) -> _NodeRefCount: - # NOTE: subset ops do not guarantee that the sub heap is really a cpu/gpu node - # TODO: probably need to add a cpu|gpu boolean/enum to _NodeRefCount to simplify - # checking that the sub heap results are good to return - sub_heap = self._create_sub_heap(hosts) - return self._get_next_available_cpu_node(sub_heap) - - def _get_next_available_gpu_node_from_subset( - self, hosts: t.List[str] - ) -> _NodeRefCount: - sub_heap = self._create_sub_heap(hosts) - return self._get_next_available_gpu_node(sub_heap) - - def _get_next_available_node( - self, ref_counter: t.List[_NodeRefCount] - ) -> _NodeRefCount: - """Finds the next node w/the least amount of running processes and - ensures that any elements that were directly updated are updated in - the priority structure before being made available""" - tracking_info: t.Optional[_NodeRefCount] = None - - with self._queue_lock: - while not tracking_info: - tracking_info = heapq.heappop(ref_counter) - is_dirty = tracking_info[2] - - if is_dirty: - # put dirty items right back into the heap w/new priority and mark clean - tracking_info[2] = False - heapq.heappush(tracking_info) - else: - # increment the ref count and put back into heap - tracking_info[0] += 1 - heapq.heappush(tracking_info) - - return tracking_info - - def _get_next_available_cpu_node( - self, heap: t.Optional[t.List[_NodeRefCount]] = None - ) -> _NodeRefCount: - """Find the next CPU node available w/least amount of references""" - if heap is None: - heap = self._cpu_refs - return self._get_next_available_node(heap) # self._cpu_refs) - - def _get_next_available_gpu_node( - self, heap: t.Optional[t.List[_NodeRefCount]] = None - ) -> _NodeRefCount: - """Find the next GPU node available w/least amount of references""" - if heap is None: - heap = self._gpu_refs - return self._get_next_available_node(heap) # self._gpu_refs) - def _initialize_hosts(self) -> None: with self._queue_lock: self._hosts: t.List[str] = sorted(