Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
ankona committed Jul 24, 2024
1 parent f3662dc commit edb994f
Showing 1 changed file with 2 additions and 97 deletions.
99 changes: 2 additions & 97 deletions smartsim/_core/launcher/dragon/dragonBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import dragon.native.process_group as dragon_process_group
import dragon.native.machine as dragon_machine

from smartsim._core.launcher.dragon.pqueue import NodePrioritizer
from smartsim.error.errors import SmartSimError

# pylint: enable=import-error
Expand Down Expand Up @@ -197,6 +198,7 @@ def __init__(self, pid: int) -> None:
self._view = DragonBackendView(self)
logger.debug(self._view.host_desc)
self._infra_ddict: t.Optional[dragon_ddict.DDict] = None
self._prioritizer = NodePrioritizer(self._nodes, self._queue_lock)

@property
def hosts(self) -> list[str]:
Expand All @@ -218,103 +220,6 @@ def group_infos(self) -> dict[str, ProcessGroupInfo]:
with self._queue_lock:
return self._group_infos

def _initialize_reference_counters(self) -> None:
"""Perform initialization of reference counters for nodes in the allocation."""
for node in self._nodes:
# initialize all node counts to 0 and mark the entries "is_dirty=False"
tracking_info = [0, node.hostname, False] # use list for mutability

self._ref_map[node.hostname] = tracking_info

if node.num_gpus:
heapq.heappush(self._gpu_refs, tracking_info)
else:
heapq.heappush(self._cpu_refs, tracking_info)

def _direct_increment_host(self, host: str) -> None:
"""Directly increment the reference count of a given node and ensure the
ref counter is marked as dirty to trigger a reordering"""
with self._queue_lock:
tracking_info = self._ref_map[host]
tracking_info[0] += 1
tracking_info[2] = True

def _direct_decrement_host(self, host: str) -> None:
"""Directly increment the reference count of a given node and ensure the
ref counter is marked as dirty to trigger a reordering"""
with self._queue_lock:
tracking_info = self._ref_map[host]
tracking_info[0] -= 1
tracking_info[2] = True

def _create_sub_heap(self, hosts: t.List[str]) -> t.List[_NodeRefCount]:
nodes_tracking_info: t.List[_NodeRefCount] = []

# Collect all the tracking info for the requested nodes...
for host in hosts:
tracking_info = self._ref_map[host]
nodes_tracking_info.append(tracking_info)

# ... and use it to create a new heap from a specified subset of nodes
heapq.heapify(nodes_tracking_info)

return nodes_tracking_info

def _get_next_available_cpu_node_from_subset(
self, hosts: t.List[str]
) -> _NodeRefCount:
# NOTE: subset ops do not guarantee that the sub heap is really a cpu/gpu node
# TODO: probably need to add a cpu|gpu boolean/enum to _NodeRefCount to simplify
# checking that the sub heap results are good to return
sub_heap = self._create_sub_heap(hosts)
return self._get_next_available_cpu_node(sub_heap)

def _get_next_available_gpu_node_from_subset(
self, hosts: t.List[str]
) -> _NodeRefCount:
sub_heap = self._create_sub_heap(hosts)
return self._get_next_available_gpu_node(sub_heap)

def _get_next_available_node(
self, ref_counter: t.List[_NodeRefCount]
) -> _NodeRefCount:
"""Finds the next node w/the least amount of running processes and
ensures that any elements that were directly updated are updated in
the priority structure before being made available"""
tracking_info: t.Optional[_NodeRefCount] = None

with self._queue_lock:
while not tracking_info:
tracking_info = heapq.heappop(ref_counter)
is_dirty = tracking_info[2]

if is_dirty:
# put dirty items right back into the heap w/new priority and mark clean
tracking_info[2] = False
heapq.heappush(tracking_info)
else:
# increment the ref count and put back into heap
tracking_info[0] += 1
heapq.heappush(tracking_info)

return tracking_info

def _get_next_available_cpu_node(
self, heap: t.Optional[t.List[_NodeRefCount]] = None
) -> _NodeRefCount:
"""Find the next CPU node available w/least amount of references"""
if heap is None:
heap = self._cpu_refs
return self._get_next_available_node(heap) # self._cpu_refs)

def _get_next_available_gpu_node(
self, heap: t.Optional[t.List[_NodeRefCount]] = None
) -> _NodeRefCount:
"""Find the next GPU node available w/least amount of references"""
if heap is None:
heap = self._gpu_refs
return self._get_next_available_node(heap) # self._gpu_refs)

def _initialize_hosts(self) -> None:
with self._queue_lock:
self._hosts: t.List[str] = sorted(
Expand Down

0 comments on commit edb994f

Please sign in to comment.